diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 649f2506e30a098afa1edd8545e5c2d482b34b04..16f17b596be7a069023a821f83ccd9749fd8fceb 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -16,8 +16,34 @@ # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . -#pipeline_test +# pipeline_test from __future__ import print_function +import threading +from sets import Set +from job.job_packager import JobPackager +from job.job_exceptions import WrongTemplateException +from platforms.paramiko_submitter import ParamikoSubmitter +from notifications.notifier import Notifier +from notifications.mail_notifier import MailNotifier +from bscearth.utils.date import date2str +from monitor.monitor import Monitor +from database.db_common import get_autosubmit_version +from database.db_common import delete_experiment +from experiment.experiment_common import copy_experiment +from experiment.experiment_common import new_experiment +from database.db_common import create_db +from bscearth.utils.log import Log +from job.job_grouping import JobGrouping +from job.job_list_persistence import JobListPersistencePkl +from job.job_list_persistence import JobListPersistenceDb +from job.job_package_persistence import JobPackagePersistence +from job.job_packages import JobPackageThread +from job.job_list import JobList +from git.autosubmit_git import AutosubmitGit +from job.job_common import Status +from bscearth.utils.config_parser import ConfigParserFactory +from config.config_common import AutosubmitConfig +from config.basicConfig import BasicConfig """ Main module for autosubmit. Only contains an interface class to all functionality implemented on autosubmit @@ -58,35 +84,9 @@ from pyparsing import nestedExpr sys.path.insert(0, os.path.abspath('.')) # noinspection PyPackageRequirements -from config.basicConfig import BasicConfig # noinspection PyPackageRequirements -from config.config_common import AutosubmitConfig -from bscearth.utils.config_parser import ConfigParserFactory -from job.job_common import Status -from git.autosubmit_git import AutosubmitGit -from job.job_list import JobList -from job.job_packages import JobPackageThread -from job.job_package_persistence import JobPackagePersistence -from job.job_list_persistence import JobListPersistenceDb -from job.job_list_persistence import JobListPersistencePkl -from job.job_grouping import JobGrouping # from API.testAPI import Monitor # noinspection PyPackageRequirements -from bscearth.utils.log import Log -from database.db_common import create_db -from experiment.experiment_common import new_experiment -from experiment.experiment_common import copy_experiment -from database.db_common import delete_experiment -from database.db_common import get_autosubmit_version -from monitor.monitor import Monitor -from bscearth.utils.date import date2str -from notifications.mail_notifier import MailNotifier -from notifications.notifier import Notifier -from platforms.paramiko_submitter import ParamikoSubmitter -from job.job_exceptions import WrongTemplateException -from job.job_packager import JobPackager -from sets import Set -import threading # noinspection PyUnusedLocal @@ -100,6 +100,7 @@ def signal_handler(signal_received, frame): Log.info('Autosubmit will interrupt at the next safe occasion') Autosubmit.exit = True + def signal_handler_create(signal_received, frame): """ Used to handle KeyboardInterrumpt signals while the create method is being executed @@ -109,6 +110,7 @@ def signal_handler_create(signal_received, frame): """ Log.info('Autosubmit has been closed in an unexpected way. If problems with your experiment arise, review the FAQ.') + class Autosubmit: """ Interface class for autosubmit. @@ -139,7 +141,8 @@ class Autosubmit: try: BasicConfig.read() - parser = argparse.ArgumentParser(description='Main executable for autosubmit. ') + parser = argparse.ArgumentParser( + description='Main executable for autosubmit. ') parser.add_argument('-v', '--version', action='version', version=Autosubmit.autosubmit_version, help="returns autosubmit's version number and exit") parser.add_argument('-lf', '--logfile', choices=('EVERYTHING', 'DEBUG', 'INFO', 'RESULT', 'USER_WARNING', @@ -154,15 +157,20 @@ class Autosubmit: subparsers = parser.add_subparsers(dest='command') # Run - subparser = subparsers.add_parser('run', description="runs specified experiment") + subparser = subparsers.add_parser( + 'run', description="runs specified experiment") subparser.add_argument('expid', help='experiment identifier') - subparser.add_argument('-nt', '--notransitive', action='store_true', default=False, help='Disable transitive reduction') - subparser.add_argument('-v', '--update_version', action='store_true', default=False, help='Update experiment version') + subparser.add_argument('-nt', '--notransitive', action='store_true', + default=False, help='Disable transitive reduction') + subparser.add_argument('-v', '--update_version', action='store_true', + default=False, help='Update experiment version') # Expid - subparser = subparsers.add_parser('expid', description="Creates a new experiment") + subparser = subparsers.add_parser( + 'expid', description="Creates a new experiment") group = subparser.add_mutually_exclusive_group() - group.add_argument('-y', '--copy', help='makes a copy of the specified experiment') + group.add_argument( + '-y', '--copy', help='makes a copy of the specified experiment') group.add_argument('-dm', '--dummy', action='store_true', help='creates a new experiment with default values, usually for testing') group.add_argument('-op', '--operational', action='store_true', @@ -174,29 +182,35 @@ class Autosubmit: subparser.add_argument('-c', '--config', type=str, required=False, help='defines where are located the configuration files.') # Delete - subparser = subparsers.add_parser('delete', description="delete specified experiment") + subparser = subparsers.add_parser( + 'delete', description="delete specified experiment") subparser.add_argument('expid', help='experiment identifier') - subparser.add_argument('-f', '--force', action='store_true', help='deletes experiment without confirmation') + subparser.add_argument( + '-f', '--force', action='store_true', help='deletes experiment without confirmation') # Monitor - subparser = subparsers.add_parser('monitor', description="plots specified experiment") - subparser.add_argument('expid', help='experiment identifier') + subparser = subparsers.add_parser( + 'monitor', description="plots specified experiment") + subparser.add_argument('expid', help='experiment identifier') subparser.add_argument('-o', '--output', choices=('pdf', 'png', 'ps', 'svg'), - help='chooses type of output for generated plot') ## Default -o value comes from .conf + help='chooses type of output for generated plot') # Default -o value comes from .conf subparser.add_argument('-group_by', choices=('date', 'member', 'chunk', 'split', 'automatic'), default=None, help='Groups the jobs automatically or by date, member, chunk or split') subparser.add_argument('-expand', type=str, - help='Supply the list of dates/members/chunks to filter the list of jobs. Default = "Any". ' - 'LIST = "[ 19601101 [ fc0 [1 2 3 4] fc1 [1] ] 19651101 [ fc0 [16-30] ] ]"') - subparser.add_argument('-expand_status', type=str, help='Select the statuses to be expanded') - subparser.add_argument('--hide_groups', action='store_true', default=False, help='Hides the groups from the plot') - subparser.add_argument('-cw', '--check_wrapper', action='store_true', default=False, help='Generate possible wrapper in the current workflow') - + help='Supply the list of dates/members/chunks to filter the list of jobs. Default = "Any". ' + 'LIST = "[ 19601101 [ fc0 [1 2 3 4] fc1 [1] ] 19651101 [ fc0 [16-30] ] ]"') + subparser.add_argument( + '-expand_status', type=str, help='Select the statuses to be expanded') + subparser.add_argument('--hide_groups', action='store_true', + default=False, help='Hides the groups from the plot') + subparser.add_argument('-cw', '--check_wrapper', action='store_true', + default=False, help='Generate possible wrapper in the current workflow') group2 = subparser.add_mutually_exclusive_group(required=False) group.add_argument('-fs', '--filter_status', type=str, - choices=('Any', 'READY', 'COMPLETED', 'WAITING', 'SUSPENDED', 'FAILED', 'UNKNOWN'), + choices=('Any', 'READY', 'COMPLETED', + 'WAITING', 'SUSPENDED', 'FAILED', 'UNKNOWN'), help='Select the original status to filter the list of jobs') group = subparser.add_mutually_exclusive_group(required=False) group.add_argument('-fl', '--list', type=str, @@ -206,23 +220,27 @@ class Autosubmit: help='Supply the list of chunks to filter the list of jobs. Default = "Any". ' 'LIST = "[ 19601101 [ fc0 [1 2 3 4] fc1 [1] ] 19651101 [ fc0 [16-30] ] ]"') group.add_argument('-fs', '--filter_status', type=str, - choices=('Any', 'READY', 'COMPLETED', 'WAITING', 'SUSPENDED', 'FAILED', 'UNKNOWN'), + choices=('Any', 'READY', 'COMPLETED', + 'WAITING', 'SUSPENDED', 'FAILED', 'UNKNOWN'), help='Select the original status to filter the list of jobs') group.add_argument('-ft', '--filter_type', type=str, help='Select the job type to filter the list of jobs') subparser.add_argument('--hide', action='store_true', default=False, help='hides plot window') group2.add_argument('--txt', action='store_true', default=False, - help='Generates only txt status file') + help='Generates only txt status file') group2.add_argument('-txtlog', '--txt_logfiles', action='store_true', default=False, - help='Generates only txt status file(AS < 3.12b behaviour)') + help='Generates only txt status file(AS < 3.12b behaviour)') - subparser.add_argument('-nt', '--notransitive', action='store_true', default=False, help='Disable transitive reduction') - subparser.add_argument('-d', '--detail', action='store_true', default=False, help='Shows Job List view in terminal') + subparser.add_argument('-nt', '--notransitive', action='store_true', + default=False, help='Disable transitive reduction') + subparser.add_argument('-d', '--detail', action='store_true', + default=False, help='Shows Job List view in terminal') # Stats - subparser = subparsers.add_parser('stats', description="plots statistics for specified experiment") + subparser = subparsers.add_parser( + 'stats', description="plots statistics for specified experiment") subparser.add_argument('expid', help='experiment identifier') subparser.add_argument('-ft', '--filter_type', type=str, help='Select the job type to filter ' 'the list of jobs') @@ -233,24 +251,31 @@ class Autosubmit: help='type of output for generated plot') subparser.add_argument('--hide', action='store_true', default=False, help='hides plot window') - subparser.add_argument('-nt', '--notransitive', action='store_true', default=False, help='Disable transitive reduction') + subparser.add_argument('-nt', '--notransitive', action='store_true', + default=False, help='Disable transitive reduction') # Clean - subparser = subparsers.add_parser('clean', description="clean specified experiment") + subparser = subparsers.add_parser( + 'clean', description="clean specified experiment") subparser.add_argument('expid', help='experiment identifier') - subparser.add_argument('-pr', '--project', action="store_true", help='clean project') + subparser.add_argument( + '-pr', '--project', action="store_true", help='clean project') subparser.add_argument('-p', '--plot', action="store_true", help='clean plot, only 2 last will remain') subparser.add_argument('-s', '--stats', action="store_true", help='clean stats, only last will remain') # Recovery - subparser = subparsers.add_parser('recovery', description="recover specified experiment") - subparser.add_argument('expid', type=str, help='experiment identifier') - subparser.add_argument('-np', '--noplot', action='store_true', default=False, help='omit plot') + subparser = subparsers.add_parser( + 'recovery', description="recover specified experiment") + subparser.add_argument( + 'expid', type=str, help='experiment identifier') + subparser.add_argument( + '-np', '--noplot', action='store_true', default=False, help='omit plot') subparser.add_argument('--all', action="store_true", default=False, help='Get completed files to synchronize pkl') - subparser.add_argument('-s', '--save', action="store_true", default=False, help='Save changes to disk') + subparser.add_argument( + '-s', '--save', action="store_true", default=False, help='Save changes to disk') subparser.add_argument('--hide', action='store_true', default=False, help='hides plot window') subparser.add_argument('-group_by', choices=('date', 'member', 'chunk', 'split', 'automatic'), default=None, @@ -258,28 +283,39 @@ class Autosubmit: subparser.add_argument('-expand', type=str, help='Supply the list of dates/members/chunks to filter the list of jobs. Default = "Any". ' 'LIST = "[ 19601101 [ fc0 [1 2 3 4] fc1 [1] ] 19651101 [ fc0 [16-30] ] ]"') - subparser.add_argument('-expand_status', type=str, help='Select the statuses to be expanded') - subparser.add_argument('-nt', '--notransitive', action='store_true', default=False, help='Disable transitive reduction') + subparser.add_argument( + '-expand_status', type=str, help='Select the statuses to be expanded') + subparser.add_argument('-nt', '--notransitive', action='store_true', + default=False, help='Disable transitive reduction') subparser.add_argument('-nl', '--no_recover_logs', action='store_true', default=False, help='Disable logs recovery') - subparser.add_argument('-d', '--detail', action='store_true', default=False, help='Show Job List view in terminal') + subparser.add_argument('-d', '--detail', action='store_true', + default=False, help='Show Job List view in terminal') # Migrate - subparser = subparsers.add_parser('migrate', description="Migrate experiments from current user to another") + subparser = subparsers.add_parser( + 'migrate', description="Migrate experiments from current user to another") subparser.add_argument('expid', help='experiment identifier') group = subparser.add_mutually_exclusive_group(required=True) - group.add_argument('-o', '--offer', action="store_true", default=False, help='Offer experiment') - group.add_argument('-p', '--pickup', action="store_true", default=False, help='Pick-up released experiment') + group.add_argument('-o', '--offer', action="store_true", + default=False, help='Offer experiment') + group.add_argument('-p', '--pickup', action="store_true", + default=False, help='Pick-up released experiment') # Inspect - subparser = subparsers.add_parser('inspect', description="Generate all .cmd files") + subparser = subparsers.add_parser( + 'inspect', description="Generate all .cmd files") subparser.add_argument('expid', help='experiment identifier') - subparser.add_argument('-nt', '--notransitive', action='store_true', default=False, help='Disable transitive reduction') - subparser.add_argument('-f', '--force', action="store_true",help='Overwrite all cmd') - subparser.add_argument('-cw', '--check_wrapper', action='store_true', default=False, help='Generate possible wrapper in the current workflow') + subparser.add_argument('-nt', '--notransitive', action='store_true', + default=False, help='Disable transitive reduction') + subparser.add_argument( + '-f', '--force', action="store_true", help='Overwrite all cmd') + subparser.add_argument('-cw', '--check_wrapper', action='store_true', + default=False, help='Generate possible wrapper in the current workflow') group.add_argument('-fs', '--filter_status', type=str, - choices=('Any', 'READY', 'COMPLETED', 'WAITING', 'SUSPENDED', 'FAILED', 'UNKNOWN'), + choices=('Any', 'READY', 'COMPLETED', + 'WAITING', 'SUSPENDED', 'FAILED', 'UNKNOWN'), help='Select the original status to filter the list of jobs') group = subparser.add_mutually_exclusive_group(required=False) group.add_argument('-fl', '--list', type=str, @@ -289,71 +325,89 @@ class Autosubmit: help='Supply the list of chunks to filter the list of jobs. Default = "Any". ' 'LIST = "[ 19601101 [ fc0 [1 2 3 4] fc1 [1] ] 19651101 [ fc0 [16-30] ] ]"') group.add_argument('-fs', '--filter_status', type=str, - choices=('Any', 'READY', 'COMPLETED', 'WAITING', 'SUSPENDED', 'FAILED', 'UNKNOWN'), + choices=('Any', 'READY', 'COMPLETED', + 'WAITING', 'SUSPENDED', 'FAILED', 'UNKNOWN'), help='Select the original status to filter the list of jobs') group.add_argument('-ft', '--filter_type', type=str, help='Select the job type to filter the list of jobs') - # Check - subparser = subparsers.add_parser('check', description="check configuration for specified experiment") + subparser = subparsers.add_parser( + 'check', description="check configuration for specified experiment") subparser.add_argument('expid', help='experiment identifier') - subparser.add_argument('-nt', '--notransitive', action='store_true', default=False, help='Disable transitive reduction') + subparser.add_argument('-nt', '--notransitive', action='store_true', + default=False, help='Disable transitive reduction') # Describe - subparser = subparsers.add_parser('describe', description="Show details for specified experiment") + subparser = subparsers.add_parser( + 'describe', description="Show details for specified experiment") subparser.add_argument('expid', help='experiment identifier') # Create - subparser = subparsers.add_parser('create', description="create specified experiment joblist") + subparser = subparsers.add_parser( + 'create', description="create specified experiment joblist") subparser.add_argument('expid', help='experiment identifier') - subparser.add_argument('-np', '--noplot', action='store_true', default=False, help='omit plot') + subparser.add_argument( + '-np', '--noplot', action='store_true', default=False, help='omit plot') subparser.add_argument('--hide', action='store_true', default=False, help='hides plot window') - subparser.add_argument('-d', '--detail', action='store_true', default=False, help='Show Job List view in terminal') + subparser.add_argument('-d', '--detail', action='store_true', + default=False, help='Show Job List view in terminal') subparser.add_argument('-o', '--output', choices=('pdf', 'png', 'ps', 'svg'), - help='chooses type of output for generated plot') ## Default -o value comes from .conf + help='chooses type of output for generated plot') # Default -o value comes from .conf subparser.add_argument('-group_by', choices=('date', 'member', 'chunk', 'split', 'automatic'), default=None, help='Groups the jobs automatically or by date, member, chunk or split') subparser.add_argument('-expand', type=str, help='Supply the list of dates/members/chunks to filter the list of jobs. Default = "Any". ' 'LIST = "[ 19601101 [ fc0 [1 2 3 4] fc1 [1] ] 19651101 [ fc0 [16-30] ] ]"') - subparser.add_argument('-expand_status', type=str, help='Select the statuses to be expanded') - subparser.add_argument('-nt', '--notransitive', action='store_true', default=False, help='Disable transitive reduction') - subparser.add_argument('-cw', '--check_wrapper', action='store_true', default=False, help='Generate possible wrapper in the current workflow') + subparser.add_argument( + '-expand_status', type=str, help='Select the statuses to be expanded') + subparser.add_argument('-nt', '--notransitive', action='store_true', + default=False, help='Disable transitive reduction') + subparser.add_argument('-cw', '--check_wrapper', action='store_true', + default=False, help='Generate possible wrapper in the current workflow') # Configure subparser = subparsers.add_parser('configure', description="configure database and path for autosubmit. It " "can be done at machine, user or local level." "If no arguments specified configure will " "display dialog boxes (if installed)") - subparser.add_argument('--advanced', action="store_true", help="Open advanced configuration of autosubmit") + subparser.add_argument( + '--advanced', action="store_true", help="Open advanced configuration of autosubmit") subparser.add_argument('-db', '--databasepath', default=None, help='path to database. If not supplied, ' 'it will prompt for it') - subparser.add_argument('-dbf', '--databasefilename', default=None, help='database filename') + subparser.add_argument( + '-dbf', '--databasefilename', default=None, help='database filename') subparser.add_argument('-lr', '--localrootpath', default=None, help='path to store experiments. If not ' 'supplied, it will prompt for it') subparser.add_argument('-pc', '--platformsconfpath', default=None, help='path to platforms.conf file to ' 'use by default. Optional') subparser.add_argument('-jc', '--jobsconfpath', default=None, help='path to jobs.conf file to use by ' 'default. Optional') - subparser.add_argument('-sm', '--smtphostname', default=None, help='STMP server hostname. Optional') - subparser.add_argument('-mf', '--mailfrom', default=None, help='Notifications sender address. Optional') + subparser.add_argument( + '-sm', '--smtphostname', default=None, help='STMP server hostname. Optional') + subparser.add_argument( + '-mf', '--mailfrom', default=None, help='Notifications sender address. Optional') group = subparser.add_mutually_exclusive_group() - group.add_argument('--all', action="store_true", help='configure for all users') + group.add_argument('--all', action="store_true", + help='configure for all users') group.add_argument('--local', action="store_true", help='configure only for using Autosubmit from this ' 'path') # Install - subparsers.add_parser('install', description='install database for autosubmit on the configured folder') + subparsers.add_parser( + 'install', description='install database for autosubmit on the configured folder') # Set status - subparser = subparsers.add_parser('setstatus', description="sets job status for an experiment") + subparser = subparsers.add_parser( + 'setstatus', description="sets job status for an experiment") subparser.add_argument('expid', help='experiment identifier') - subparser.add_argument('-np', '--noplot', action='store_true', default=False, help='omit plot') - subparser.add_argument('-s', '--save', action="store_true", default=False, help='Save changes to disk') + subparser.add_argument( + '-np', '--noplot', action='store_true', default=False, help='omit plot') + subparser.add_argument( + '-s', '--save', action="store_true", default=False, help='Save changes to disk') subparser.add_argument('-t', '--status_final', choices=('READY', 'COMPLETED', 'WAITING', 'SUSPENDED', 'FAILED', 'UNKNOWN', - 'QUEUING', 'RUNNING','HELD'), + 'QUEUING', 'RUNNING', 'HELD'), required=True, help='Supply the target status') group = subparser.add_mutually_exclusive_group(required=True) @@ -381,48 +435,64 @@ class Autosubmit: subparser.add_argument('-expand', type=str, help='Supply the list of dates/members/chunks to filter the list of jobs. Default = "Any". ' 'LIST = "[ 19601101 [ fc0 [1 2 3 4] fc1 [1] ] 19651101 [ fc0 [16-30] ] ]"') - subparser.add_argument('-expand_status', type=str, help='Select the statuses to be expanded') - subparser.add_argument('-nt', '--notransitive', action='store_true', default=False, help='Disable transitive reduction') - subparser.add_argument('-cw', '--check_wrapper', action='store_true', default=False, help='Generate possible wrapper in the current workflow') - subparser.add_argument('-d', '--detail', action='store_true', default=False, help='Generate detailed view of changes') - + subparser.add_argument( + '-expand_status', type=str, help='Select the statuses to be expanded') + subparser.add_argument('-nt', '--notransitive', action='store_true', + default=False, help='Disable transitive reduction') + subparser.add_argument('-cw', '--check_wrapper', action='store_true', + default=False, help='Generate possible wrapper in the current workflow') + subparser.add_argument('-d', '--detail', action='store_true', + default=False, help='Generate detailed view of changes') # Test Case - subparser = subparsers.add_parser('testcase', description='create test case experiment') - subparser.add_argument('-y', '--copy', help='makes a copy of the specified experiment') - subparser.add_argument('-d', '--description', required=True, help='description of the test case') + subparser = subparsers.add_parser( + 'testcase', description='create test case experiment') + subparser.add_argument( + '-y', '--copy', help='makes a copy of the specified experiment') + subparser.add_argument( + '-d', '--description', required=True, help='description of the test case') subparser.add_argument('-c', '--chunks', help='chunks to run') subparser.add_argument('-m', '--member', help='member to run') subparser.add_argument('-s', '--stardate', help='stardate to run') - subparser.add_argument('-H', '--HPC', required=True, help='HPC to run experiment on it') - subparser.add_argument('-b', '--branch', help='branch of git to run (or revision from subversion)') + subparser.add_argument( + '-H', '--HPC', required=True, help='HPC to run experiment on it') + subparser.add_argument( + '-b', '--branch', help='branch of git to run (or revision from subversion)') # Test - subparser = subparsers.add_parser('test', description='test experiment') + subparser = subparsers.add_parser( + 'test', description='test experiment') subparser.add_argument('expid', help='experiment identifier') - subparser.add_argument('-c', '--chunks', required=True, help='chunks to run') + subparser.add_argument( + '-c', '--chunks', required=True, help='chunks to run') subparser.add_argument('-m', '--member', help='member to run') subparser.add_argument('-s', '--stardate', help='stardate to run') - subparser.add_argument('-H', '--HPC', help='HPC to run experiment on it') - subparser.add_argument('-b', '--branch', help='branch of git to run (or revision from subversion)') + subparser.add_argument( + '-H', '--HPC', help='HPC to run experiment on it') + subparser.add_argument( + '-b', '--branch', help='branch of git to run (or revision from subversion)') # Refresh - subparser = subparsers.add_parser('refresh', description='refresh project directory for an experiment') + subparser = subparsers.add_parser( + 'refresh', description='refresh project directory for an experiment') subparser.add_argument('expid', help='experiment identifier') subparser.add_argument('-mc', '--model_conf', default=False, action='store_true', help='overwrite model conf file') subparser.add_argument('-jc', '--jobs_conf', default=False, action='store_true', help='overwrite jobs conf file') # Update Version - subparser = subparsers.add_parser('updateversion', description='refresh experiment version') + subparser = subparsers.add_parser( + 'updateversion', description='refresh experiment version') subparser.add_argument('expid', help='experiment identifier') # Archive - subparser = subparsers.add_parser('archive', description='archives an experiment') + subparser = subparsers.add_parser( + 'archive', description='archives an experiment') subparser.add_argument('expid', help='experiment identifier') # Unarchive - subparser = subparsers.add_parser('unarchive', description='unarchives an experiment') + subparser = subparsers.add_parser( + 'unarchive', description='unarchives an experiment') subparser.add_argument('expid', help='experiment identifier') # Readme @@ -437,16 +507,16 @@ class Autosubmit: Log.set_file_level(args.logfile) if args.command == 'run': - return Autosubmit.run_experiment(args.expid, args.notransitive,args.update_version) + return Autosubmit.run_experiment(args.expid, args.notransitive, args.update_version) elif args.command == 'expid': return Autosubmit.expid(args.HPC, args.description, args.copy, args.dummy, False, - args.operational,args.config) != '' + args.operational, args.config) != '' elif args.command == 'delete': return Autosubmit.delete(args.expid, args.force) elif args.command == 'monitor': return Autosubmit.monitor(args.expid, args.output, args.list, args.filter_chunks, args.filter_status, args.filter_type, args.hide, args.txt, args.group_by, args.expand, - args.expand_status, args.hide_groups, args.notransitive,args.check_wrapper,args.txt_logfiles, args.detail) + args.expand_status, args.hide_groups, args.notransitive, args.check_wrapper, args.txt_logfiles, args.detail) elif args.command == 'stats': return Autosubmit.statistics(args.expid, args.filter_type, args.filter_period, args.output, args.hide, args.notransitive) @@ -454,19 +524,19 @@ class Autosubmit: return Autosubmit.clean(args.expid, args.project, args.plot, args.stats) elif args.command == 'recovery': return Autosubmit.recovery(args.expid, args.noplot, args.save, args.all, args.hide, args.group_by, - args.expand, args.expand_status, args.notransitive,args.no_recover_logs, args.detail) + args.expand, args.expand_status, args.notransitive, args.no_recover_logs, args.detail) elif args.command == 'check': return Autosubmit.check(args.expid, args.notransitive) elif args.command == 'inspect': return Autosubmit.inspect(args.expid, args.list, args.filter_chunks, args.filter_status, - args.filter_type,args.notransitive , args.force,args.check_wrapper) + args.filter_type, args.notransitive, args.force, args.check_wrapper) elif args.command == 'describe': return Autosubmit.describe(args.expid) elif args.command == 'migrate': return Autosubmit.migrate(args.expid, args.offer, args.pickup) elif args.command == 'create': return Autosubmit.create(args.expid, args.noplot, args.hide, args.output, args.group_by, args.expand, - args.expand_status, args.notransitive,args.check_wrapper, args.detail) + args.expand_status, args.notransitive, args.check_wrapper, args.detail) elif args.command == 'configure': if not args.advanced or (args.advanced and dialog is None): return Autosubmit.configure(args.advanced, args.databasepath, args.databasefilename, @@ -479,7 +549,7 @@ class Autosubmit: elif args.command == 'setstatus': return Autosubmit.set_status(args.expid, args.noplot, args.save, args.status_final, args.list, args.filter_chunks, args.filter_status, args.filter_type, args.filter_type_chunk, args.hide, - args.group_by, args.expand, args.expand_status, args.notransitive,args.check_wrapper, args.detail) + args.group_by, args.expand, args.expand_status, args.notransitive, args.check_wrapper, args.detail) elif args.command == 'testcase': return Autosubmit.testcase(args.copy, args.description, args.chunks, args.member, args.stardate, args.HPC, args.branch) @@ -508,7 +578,8 @@ class Autosubmit: return False except Exception as e: from traceback import format_exc - Log.critical('Unhandled exception on Autosubmit: {0}\n{1}', e, format_exc(10)) + Log.critical( + 'Unhandled exception on Autosubmit: {0}\n{1}', e, format_exc(10)) return False @@ -517,11 +588,13 @@ class Autosubmit: BasicConfig.read() #currentUser_id = os.getlogin() currentUser_id = pwd.getpwuid(os.getuid())[0] - currentOwner_id = pwd.getpwuid(os.stat(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid)).st_uid).pw_name + currentOwner_id = pwd.getpwuid(os.stat(os.path.join( + BasicConfig.LOCAL_ROOT_DIR, expid)).st_uid).pw_name if currentUser_id == currentOwner_id: return True else: return False + @staticmethod def _delete_expid(expid_delete, force): """ @@ -551,22 +624,27 @@ class Autosubmit: currentOwner_id = 0 currentOwner = "empty" try: - currentOwner = os.stat(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid_delete)).st_uid - currentOwner_id = pwd.getpwuid(os.stat(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid_delete)).st_uid).pw_name + currentOwner = os.stat(os.path.join( + BasicConfig.LOCAL_ROOT_DIR, expid_delete)).st_uid + currentOwner_id = pwd.getpwuid(os.stat(os.path.join( + BasicConfig.LOCAL_ROOT_DIR, expid_delete)).st_uid).pw_name except: pass finally: if currentOwner_id == 0: - Log.info("Current owner '{0}' of experiment {1} does not exist anymore.", currentOwner, expid_delete) + Log.info( + "Current owner '{0}' of experiment {1} does not exist anymore.", currentOwner, expid_delete) - # Deletion workflow continues as usual, a disjunction is included for the case when + # Deletion workflow continues as usual, a disjunction is included for the case when # force is sent, and user is eadmin if currentOwner_id == os.getlogin() or (force and my_user == id_eadmin): if (force and my_user == id_eadmin): - Log.info("Preparing deletion of experiment {0} from owner: {1}, as eadmin.", expid_delete, currentOwner) + Log.info( + "Preparing deletion of experiment {0} from owner: {1}, as eadmin.", expid_delete, currentOwner) try: Log.info("Removing experiment directory...") - shutil.rmtree(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid_delete)) + shutil.rmtree(os.path.join( + BasicConfig.LOCAL_ROOT_DIR, expid_delete)) except OSError as e: Log.warning('Can not delete experiment folder: {0}', e) return ret @@ -576,9 +654,11 @@ class Autosubmit: Log.result("Experiment {0} deleted".format(expid_delete)) else: if currentOwner_id == 0: - Log.critical("Detected Eadmin user however, -f flag is not found. {0} can not be deleted!",expid_delete) + Log.critical( + "Detected Eadmin user however, -f flag is not found. {0} can not be deleted!", expid_delete) else: - Log.critical("Current user is not the owner of the experiment. {0} can not be deleted!",expid_delete) + Log.critical( + "Current user is not the owner of the experiment. {0} can not be deleted!", expid_delete) return ret @staticmethod @@ -602,11 +682,13 @@ class Autosubmit: """ BasicConfig.read() - log_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, 'ASlogs', 'expid.log'.format(os.getuid())) + log_path = os.path.join( + BasicConfig.LOCAL_ROOT_DIR, 'ASlogs', 'expid.log'.format(os.getuid())) try: Log.set_file(log_path) except IOError as e: - Log.error("Can not create log file in path {0}: {1}".format(log_path, e.message)) + Log.error("Can not create log file in path {0}: {1}".format( + log_path, e.message)) exp_id = None if description is None: Log.error("Missing experiment description.") @@ -615,13 +697,15 @@ class Autosubmit: Log.error("Missing HPC.") return '' if not copy_id: - exp_id = new_experiment(description, Autosubmit.autosubmit_version, test, operational) + exp_id = new_experiment( + description, Autosubmit.autosubmit_version, test, operational) if exp_id == '': return '' try: os.mkdir(os.path.join(BasicConfig.LOCAL_ROOT_DIR, exp_id)) - os.mkdir(os.path.join(BasicConfig.LOCAL_ROOT_DIR, exp_id, 'conf')) + os.mkdir(os.path.join( + BasicConfig.LOCAL_ROOT_DIR, exp_id, 'conf')) Log.info("Copying config files...") # autosubmit config and experiment copied from AS. @@ -629,32 +713,41 @@ class Autosubmit: for filename in files: if resource_exists('autosubmit.config', 'files/' + filename): index = filename.index('.') - new_filename = filename[:index] + "_" + exp_id + filename[index:] + new_filename = filename[:index] + \ + "_" + exp_id + filename[index:] if filename == 'platforms.conf' and BasicConfig.DEFAULT_PLATFORMS_CONF != '': - content = open(os.path.join(BasicConfig.DEFAULT_PLATFORMS_CONF, filename)).read() + content = open(os.path.join( + BasicConfig.DEFAULT_PLATFORMS_CONF, filename)).read() elif filename == 'jobs.conf' and BasicConfig.DEFAULT_JOBS_CONF != '': - content = open(os.path.join(BasicConfig.DEFAULT_JOBS_CONF, filename)).read() + content = open(os.path.join( + BasicConfig.DEFAULT_JOBS_CONF, filename)).read() else: - content = resource_string('autosubmit.config', 'files/' + filename) - + content = resource_string( + 'autosubmit.config', 'files/' + filename) + # If autosubmitrc [conf] custom_platforms has been set and file exists, replace content if filename.startswith("platforms") and os.path.isfile(BasicConfig.CUSTOM_PLATFORMS_PATH): - content = open(BasicConfig.CUSTOM_PLATFORMS_PATH, 'r').read() + content = open( + BasicConfig.CUSTOM_PLATFORMS_PATH, 'r').read() - conf_new_filename = os.path.join(BasicConfig.LOCAL_ROOT_DIR, exp_id, "conf", new_filename) + conf_new_filename = os.path.join( + BasicConfig.LOCAL_ROOT_DIR, exp_id, "conf", new_filename) Log.debug(conf_new_filename) open(conf_new_filename, 'w').write(content) - Autosubmit._prepare_conf_files(exp_id, hpc, Autosubmit.autosubmit_version, dummy) + Autosubmit._prepare_conf_files( + exp_id, hpc, Autosubmit.autosubmit_version, dummy) except (OSError, IOError) as e: - Log.error("Can not create experiment: {0}\nCleaning...".format(e)) + Log.error( + "Can not create experiment: {0}\nCleaning...".format(e)) Autosubmit._delete_expid(exp_id) return '' else: # copy_id has been set by the user try: if root_folder == '' or root_folder is None: - root_folder=os.path.join(BasicConfig.LOCAL_ROOT_DIR, copy_id) + root_folder = os.path.join( + BasicConfig.LOCAL_ROOT_DIR, copy_id) if os.path.exists(root_folder): # List of allowed files from conf conf_copy_filter_folder = [] @@ -665,52 +758,67 @@ class Autosubmit: "proj_" + str(copy_id) + ".conf"] if root_folder != os.path.join(BasicConfig.LOCAL_ROOT_DIR, copy_id): conf_copy_filter_folder = ["autosubmit.conf", - "expdef.conf", - "jobs.conf", - "platforms.conf", - "proj.conf"] - exp_id = new_experiment(description, Autosubmit.autosubmit_version, test, operational) + "expdef.conf", + "jobs.conf", + "platforms.conf", + "proj.conf"] + exp_id = new_experiment( + description, Autosubmit.autosubmit_version, test, operational) else: - exp_id = copy_experiment(copy_id, description, Autosubmit.autosubmit_version, test, operational) + exp_id = copy_experiment( + copy_id, description, Autosubmit.autosubmit_version, test, operational) if exp_id == '': return '' - dir_exp_id = os.path.join(BasicConfig.LOCAL_ROOT_DIR, exp_id) + dir_exp_id = os.path.join( + BasicConfig.LOCAL_ROOT_DIR, exp_id) os.mkdir(dir_exp_id) os.mkdir(dir_exp_id + '/conf') if root_folder == os.path.join(BasicConfig.LOCAL_ROOT_DIR, copy_id): - Log.info("Copying previous experiment config directories") - conf_copy_id = os.path.join(BasicConfig.LOCAL_ROOT_DIR, copy_id, "conf") + Log.info( + "Copying previous experiment config directories") + conf_copy_id = os.path.join( + BasicConfig.LOCAL_ROOT_DIR, copy_id, "conf") else: - Log.info("Copying from folder: {0}",root_folder) + Log.info("Copying from folder: {0}", root_folder) conf_copy_id = root_folder files = os.listdir(conf_copy_id) for filename in files: # Allow only those files in the list if filename in conf_copy_filter: if os.path.isfile(os.path.join(conf_copy_id, filename)): - new_filename = filename.replace(copy_id, exp_id) - content = open(os.path.join(conf_copy_id, filename), 'r').read() - + new_filename = filename.replace( + copy_id, exp_id) + content = open(os.path.join( + conf_copy_id, filename), 'r').read() + # If autosubmitrc [conf] custom_platforms has been set and file exists, replace content if filename.startswith("platforms") and os.path.isfile(BasicConfig.CUSTOM_PLATFORMS_PATH): - content = open(BasicConfig.CUSTOM_PLATFORMS_PATH, 'r').read() + content = open( + BasicConfig.CUSTOM_PLATFORMS_PATH, 'r').read() - open(os.path.join(dir_exp_id, "conf", new_filename), 'w').write(content) + open(os.path.join(dir_exp_id, "conf", + new_filename), 'w').write(content) if filename in conf_copy_filter_folder: if os.path.isfile(os.path.join(conf_copy_id, filename)): - new_filename = filename.split(".")[0]+"_"+exp_id+".conf" - content = open(os.path.join(conf_copy_id, filename), 'r').read() + new_filename = filename.split( + ".")[0]+"_"+exp_id+".conf" + content = open(os.path.join( + conf_copy_id, filename), 'r').read() # If autosubmitrc [conf] custom_platforms has been set and file exists, replace content if filename.startswith("platforms") and os.path.isfile( BasicConfig.CUSTOM_PLATFORMS_PATH): - content = open(BasicConfig.CUSTOM_PLATFORMS_PATH, 'r').read() + content = open( + BasicConfig.CUSTOM_PLATFORMS_PATH, 'r').read() - open(os.path.join(dir_exp_id, "conf", new_filename), 'w').write(content) + open(os.path.join(dir_exp_id, "conf", + new_filename), 'w').write(content) - Autosubmit._prepare_conf_files(exp_id, hpc, Autosubmit.autosubmit_version, dummy) + Autosubmit._prepare_conf_files( + exp_id, hpc, Autosubmit.autosubmit_version, dummy) ##### - autosubmit_config = AutosubmitConfig(copy_id, BasicConfig, ConfigParserFactory()) + autosubmit_config = AutosubmitConfig( + copy_id, BasicConfig, ConfigParserFactory()) if autosubmit_config.check_conf_files(): project_type = autosubmit_config.get_project_type() if project_type == "git": @@ -721,11 +829,13 @@ class Autosubmit: return False ##### else: - Log.critical("The previous experiment directory does not exist") + Log.critical( + "The previous experiment directory does not exist") return '' except (OSError, IOError) as e: - Log.error("Can not create experiment: {0}\nCleaning...".format(e)) - Autosubmit._delete_expid(exp_id,True) + Log.error( + "Can not create experiment: {0}\nCleaning...".format(e)) + Autosubmit._delete_expid(exp_id, True) return '' Log.debug("Creating temporal directory...") @@ -733,14 +843,13 @@ class Autosubmit: tmp_path = os.path.join(exp_id_path, "tmp") os.mkdir(tmp_path) os.chmod(tmp_path, 0o775) - os.mkdir(os.path.join(tmp_path , BasicConfig.LOCAL_ASLOG_DIR)) - os.chmod(os.path.join(tmp_path , BasicConfig.LOCAL_ASLOG_DIR), 0o755) + os.mkdir(os.path.join(tmp_path, BasicConfig.LOCAL_ASLOG_DIR)) + os.chmod(os.path.join(tmp_path, BasicConfig.LOCAL_ASLOG_DIR), 0o755) Log.debug("Creating temporal remote directory...") - remote_tmp_path = os.path.join(tmp_path,"LOG_"+exp_id) + remote_tmp_path = os.path.join(tmp_path, "LOG_"+exp_id) os.mkdir(remote_tmp_path) os.chmod(remote_tmp_path, 0o755) - Log.debug("Creating pkl directory...") os.mkdir(os.path.join(exp_id_path, "pkl")) @@ -755,11 +864,16 @@ class Autosubmit: os.chmod(os.path.join(exp_id_path, "pkl"), 0o755) os.chmod(os.path.join(exp_id_path, "tmp"), 0o755) os.chmod(os.path.join(exp_id_path, "plot"), 0o755) - os.chmod(os.path.join(exp_id_path, "conf/autosubmit_" + str(exp_id) + ".conf"), 0o644) - os.chmod(os.path.join(exp_id_path, "conf/expdef_" + str(exp_id) + ".conf"), 0o644) - os.chmod(os.path.join(exp_id_path, "conf/jobs_" + str(exp_id) + ".conf"), 0o644) - os.chmod(os.path.join(exp_id_path, "conf/platforms_" + str(exp_id) + ".conf"), 0o644) - os.chmod(os.path.join(exp_id_path, "conf/proj_" + str(exp_id) + ".conf"), 0o644) + os.chmod(os.path.join(exp_id_path, "conf/autosubmit_" + + str(exp_id) + ".conf"), 0o644) + os.chmod(os.path.join(exp_id_path, "conf/expdef_" + + str(exp_id) + ".conf"), 0o644) + os.chmod(os.path.join(exp_id_path, "conf/jobs_" + + str(exp_id) + ".conf"), 0o644) + os.chmod(os.path.join(exp_id_path, "conf/platforms_" + + str(exp_id) + ".conf"), 0o644) + os.chmod(os.path.join(exp_id_path, "conf/proj_" + + str(exp_id) + ".conf"), 0o644) except: pass return exp_id @@ -778,11 +892,13 @@ class Autosubmit: :rtype: bool """ - log_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, "ASlogs", 'delete.log'.format(os.getuid())) + log_path = os.path.join( + BasicConfig.LOCAL_ROOT_DIR, "ASlogs", 'delete.log'.format(os.getuid())) try: Log.set_file(log_path) except IOError as e: - Log.error("Can not create log file in path {0}: {1}".format(log_path, e.message)) + Log.error("Can not create log file in path {0}: {1}".format( + log_path, e.message)) if os.path.exists(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid)): if force or Autosubmit._user_yes_no_query("Do you want to delete " + expid + " ?"): @@ -820,8 +936,9 @@ class Autosubmit: platform.add_parameters(parameters, True) # Attach paramenters to JobList job_list.parameters = parameters + @staticmethod - def inspect(expid, lst, filter_chunks, filter_status, filter_section , notransitive=False, force=False, check_wrapper=False): + def inspect(expid, lst, filter_chunks, filter_status, filter_section, notransitive=False, force=False, check_wrapper=False): """ Generates cmd files experiment. @@ -836,21 +953,24 @@ class Autosubmit: BasicConfig.read() if not Autosubmit._check_Ownership(expid): - Log.critical('Can not inspect the experiment {0} because you are not the owner',expid) + Log.critical( + 'Can not inspect the experiment {0} because you are not the owner', expid) return False exp_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid) tmp_path = os.path.join(exp_path, BasicConfig.LOCAL_TMP_DIR) if os.path.exists(os.path.join(tmp_path, 'autosubmit.lock')): - locked=True + locked = True else: - locked=False + locked = False if not os.path.exists(exp_path): - Log.critical("The directory %s is needed and does not exist" % exp_path) + Log.critical( + "The directory %s is needed and does not exist" % exp_path) Log.warning("Does an experiment with the given id exist?") return 1 Log.info("Starting inspect command") - Log.set_file(os.path.join(tmp_path, BasicConfig.LOCAL_ASLOG_DIR, 'generate.log')) + Log.set_file(os.path.join( + tmp_path, BasicConfig.LOCAL_ASLOG_DIR, 'generate.log')) os.system('clear') signal.signal(signal.SIGINT, signal_handler) as_conf = AutosubmitConfig(expid, BasicConfig, ConfigParserFactory()) @@ -866,10 +986,12 @@ class Autosubmit: Log.debug("Sleep: {0}", safetysleeptime) packages_persistence = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid) - os.chmod(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl", "job_packages_" + expid + ".db"), 0644) + os.chmod(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, + "pkl", "job_packages_" + expid + ".db"), 0o644) packages_persistence.reset_table(True) - job_list_original = Autosubmit.load_job_list(expid, as_conf, notransitive=notransitive) + job_list_original = Autosubmit.load_job_list( + expid, as_conf, notransitive=notransitive) job_list = copy.deepcopy(job_list_original) job_list.packages_dict = {} @@ -878,23 +1000,24 @@ class Autosubmit: # variables to be updated on the fly safetysleeptime = as_conf.get_safetysleeptime() Log.debug("Sleep: {0}", safetysleeptime) - #Generate + # Generate Log.info("Starting to generate cmd scripts") if not isinstance(job_list, type([])): jobs = [] jobs_cw = [] - if check_wrapper and ( not locked or (force and locked)): + if check_wrapper and (not locked or (force and locked)): Log.info("Generating all cmd script adapted for wrappers") jobs = job_list.get_uncompleted() - + jobs_cw = job_list.get_completed() else: - if (force and not locked) or (force and locked) : + if (force and not locked) or (force and locked): Log.info("Overwritting all cmd scripts") jobs = job_list.get_job_list() elif locked: - Log.warning("There is a .lock file and not -f, generating only all unsubmitted cmd scripts") + Log.warning( + "There is a .lock file and not -f, generating only all unsubmitted cmd scripts") jobs = job_list.get_unsubmitted() else: Log.info("Generating cmd scripts only for selected jobs") @@ -908,23 +1031,29 @@ class Autosubmit: data = json.loads(Autosubmit._create_json(fc)) for date_json in data['sds']: date = date_json['sd'] - jobs_date = filter(lambda j: date2str(j.date) == date, job_list.get_job_list()) + jobs_date = filter(lambda j: date2str( + j.date) == date, job_list.get_job_list()) for member_json in date_json['ms']: member = member_json['m'] - jobs_member = filter(lambda j: j.member == member, jobs_date) + jobs_member = filter( + lambda j: j.member == member, jobs_date) for chunk_json in member_json['cs']: chunk = int(chunk_json) - jobs = jobs + [job for job in filter(lambda j: j.chunk == chunk, jobs_member)] + jobs = jobs + \ + [job for job in filter( + lambda j: j.chunk == chunk, jobs_member)] elif filter_status: - Log.debug("Filtering jobs with status {0}", filter_status) + Log.debug( + "Filtering jobs with status {0}", filter_status) if filter_status == 'Any': jobs = job_list.get_job_list() else: fs = Autosubmit._get_status(filter_status) - jobs = [job for job in filter(lambda j: j.status == fs, job_list.get_job_list())] + jobs = [job for job in filter( + lambda j: j.status == fs, job_list.get_job_list())] elif filter_section: ft = filter_section @@ -936,7 +1065,7 @@ class Autosubmit: for job in job_list.get_job_list(): if job.section == ft: jobs.append(job) -#TOERASE +# TOERASE elif lst: jobs_lst = lst.split() @@ -959,10 +1088,11 @@ class Autosubmit: referenced_jobs_to_remove.add(parent) for job in jobs: - job.status=Status.WAITING + job.status = Status.WAITING - Autosubmit.generate_scripts_andor_wrappers(as_conf,job_list, jobs,packages_persistence,False) - if len(jobs_cw) >0: + Autosubmit.generate_scripts_andor_wrappers( + as_conf, job_list, jobs, packages_persistence, False) + if len(jobs_cw) > 0: referenced_jobs_to_remove = set() for job in jobs_cw: for child in job.children: @@ -974,14 +1104,15 @@ class Autosubmit: for job in jobs_cw: job.status = Status.WAITING - Autosubmit.generate_scripts_andor_wrappers(as_conf, job_list, jobs_cw,packages_persistence,False) + Autosubmit.generate_scripts_andor_wrappers( + as_conf, job_list, jobs_cw, packages_persistence, False) Log.info("no more scripts to generate, now proceed to check them manually") time.sleep(safetysleeptime) return True @staticmethod - def generate_scripts_andor_wrappers(as_conf,job_list,jobs_filtered,packages_persistence,only_wrappers=False): + def generate_scripts_andor_wrappers(as_conf, job_list, jobs_filtered, packages_persistence, only_wrappers=False): """ :param as_conf: Class that handles basic configuration parameters of Autosubmit. \n :type as_conf: AutosubmitConfig() Object \n @@ -996,7 +1127,7 @@ class Autosubmit: :return: Nothing\n :rtype: \n """ - job_list._job_list=jobs_filtered + job_list._job_list = jobs_filtered # Current choice is Paramiko Submitter submitter = Autosubmit._get_submitter(as_conf) @@ -1005,12 +1136,12 @@ class Autosubmit: # The value is retrieved from DEFAULT.HPCARCH hpcarch = as_conf.get_platform() Autosubmit._load_parameters(as_conf, job_list, submitter.platforms) - platforms_to_test = set() + platforms_to_test = set() for job in job_list.get_job_list(): if job.platform_name is None: job.platform_name = hpcarch # Assign platform objects to each job - # noinspection PyTypeChecker + # noinspection PyTypeChecker job.platform = submitter.platforms[job.platform_name.lower()] # Add object to set # noinspection PyTypeChecker @@ -1022,13 +1153,12 @@ class Autosubmit: Autosubmit._load_parameters(as_conf, job_list, submitter.platforms) while job_list.get_active(): # Sending only_wrappers = True - Autosubmit.submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence,True,only_wrappers,hold=False) - job_list.update_list(as_conf, False,False) - - + Autosubmit.submit_ready_jobs( + as_conf, job_list, platforms_to_test, packages_persistence, True, only_wrappers, hold=False) + job_list.update_list(as_conf, False, False) @staticmethod - def run_experiment(expid, notransitive=False,update_version=False): + def run_experiment(expid, notransitive=False, update_version=False): """ Runs and experiment (submitting all the jobs properly and repeating its execution in case of failure). @@ -1041,16 +1171,18 @@ class Autosubmit: Log.critical("Missing experiment id") BasicConfig.read() if not Autosubmit._check_Ownership(expid): - Log.critical('Can not run the experiment {0} because you are not the owner',expid) + Log.critical( + 'Can not run the experiment {0} because you are not the owner', expid) return False exp_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid) tmp_path = os.path.join(exp_path, BasicConfig.LOCAL_TMP_DIR) aslogs_path = os.path.join(tmp_path, BasicConfig.LOCAL_ASLOG_DIR) if not os.path.exists(aslogs_path): os.mkdir(aslogs_path) - os.chmod(aslogs_path,0o755) + os.chmod(aslogs_path, 0o755) if not os.path.exists(exp_path): - Log.critical("The directory %s is needed and does not exist" % exp_path) + Log.critical( + "The directory %s is needed and does not exist" % exp_path) Log.warning("Does an experiment with the given id exist?") return 1 @@ -1064,25 +1196,29 @@ class Autosubmit: if not as_conf.check_conf_files(): Log.critical('Can not run with invalid configuration') return False - Log.info("Autosubmit is running with {0}",Autosubmit.autosubmit_version) + Log.info( + "Autosubmit is running with {0}", Autosubmit.autosubmit_version) if update_version: if as_conf.get_version() != Autosubmit.autosubmit_version: - Log.info("The {2} experiment {0} version is being updated to {1} for match autosubmit version",as_conf.get_version(),Autosubmit.autosubmit_version,expid) + Log.info("The {2} experiment {0} version is being updated to {1} for match autosubmit version", + as_conf.get_version(), Autosubmit.autosubmit_version, expid) as_conf.set_version(Autosubmit.autosubmit_version) else: if as_conf.get_version() != '' and as_conf.get_version() != Autosubmit.autosubmit_version: Log.critical("Current experiment uses ({0}) which is not the running Autosubmit version \nPlease, update the experiment version if you wish to continue using AutoSubmit {1}\nYou can achieve this using the command autosubmit updateversion {2} \n" - "Or with the -v parameter: autosubmit run {2} -v ",as_conf.get_version(),Autosubmit.autosubmit_version,expid) + "Or with the -v parameter: autosubmit run {2} -v ", as_conf.get_version(), Autosubmit.autosubmit_version, expid) return 1 # checking if there is a lock file to avoid multiple running on the same expid try: with portalocker.Lock(os.path.join(tmp_path, 'autosubmit.lock'), timeout=1): - Log.info("Preparing .lock file to avoid multiple instances with same experiment id") + Log.info( + "Preparing .lock file to avoid multiple instances with same experiment id") Log.set_file(os.path.join(aslogs_path, 'run.log')) os.system('clear') signal.signal(signal.SIGINT, signal_handler) - as_conf = AutosubmitConfig(expid, BasicConfig, ConfigParserFactory()) + as_conf = AutosubmitConfig( + expid, BasicConfig, ConfigParserFactory()) if not as_conf.check_conf_files(): Log.critical('Can not run with invalid configuration') return False @@ -1099,11 +1235,15 @@ class Autosubmit: Log.debug("Sleep: {0}", safetysleeptime) Log.debug("Default retrials: {0}", retrials) Log.info("Starting job submission...") - pkl_dir = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, 'pkl') - job_list = Autosubmit.load_job_list(expid, as_conf, notransitive=notransitive) - Log.debug("Starting from job list restored from {0} files", pkl_dir) + pkl_dir = os.path.join( + BasicConfig.LOCAL_ROOT_DIR, expid, 'pkl') + job_list = Autosubmit.load_job_list( + expid, as_conf, notransitive=notransitive) + Log.debug( + "Starting from job list restored from {0} files", pkl_dir) Log.debug("Length of the jobs list: {0}", len(job_list)) - Autosubmit._load_parameters(as_conf, job_list, submitter.platforms) + Autosubmit._load_parameters( + as_conf, job_list, submitter.platforms) # check the job list script creation Log.debug("Checking experiment templates...") platforms_to_test = set() @@ -1111,19 +1251,22 @@ class Autosubmit: if job.platform_name is None: job.platform_name = hpcarch # noinspection PyTypeChecker - job.platform = submitter.platforms[job.platform_name.lower()] + job.platform = submitter.platforms[job.platform_name.lower( + )] # noinspection PyTypeChecker platforms_to_test.add(job.platform) job_list.check_scripts(as_conf) packages_persistence = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), - "job_packages_" + expid) + "job_packages_" + expid) if as_conf.get_wrapper_type() != 'none': - os.chmod(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl","job_packages_" + expid+".db"), 0644) + os.chmod(os.path.join(BasicConfig.LOCAL_ROOT_DIR, + expid, "pkl", "job_packages_" + expid+".db"), 0o644) packages = packages_persistence.load() for (exp_id, package_name, job_name) in packages: if package_name not in job_list.packages_dict: job_list.packages_dict[package_name] = [] - job_list.packages_dict[package_name].append(job_list.get_job_by_name(job_name)) + job_list.packages_dict[package_name].append( + job_list.get_job_by_name(job_name)) for package_name, jobs in job_list.packages_dict.items(): from job.job import WrapperJob wrapper_job = WrapperJob(package_name, jobs[0].id, Status.SUBMITTED, 0, jobs, @@ -1132,7 +1275,8 @@ class Autosubmit: job_list.job_package_map[jobs[0].id] = wrapper_job job_list.update_list(as_conf) job_list.save() - Log.info("Autosubmit is running with v{0}", Autosubmit.autosubmit_version) + Log.info( + "Autosubmit is running with v{0}", Autosubmit.autosubmit_version) ######################### # AUTOSUBMIT - MAIN LOOP ######################### @@ -1141,7 +1285,8 @@ class Autosubmit: # reload parameters changes Log.debug("Reloading parameters...") as_conf.reload() - Autosubmit._load_parameters(as_conf, job_list, submitter.platforms) + Autosubmit._load_parameters( + as_conf, job_list, submitter.platforms) # variables to be updated on the fly total_jobs = len(job_list.get_job_list()) Log.info( @@ -1153,46 +1298,56 @@ class Autosubmit: check_wrapper_jobs_sleeptime = as_conf.get_wrapper_check_time() Log.debug("Sleep: {0}", safetysleeptime) Log.debug("Number of retrials: {0}", default_retrials) - Log.debug('WRAPPER CHECK TIME = {0}'.format(check_wrapper_jobs_sleeptime)) + Log.debug('WRAPPER CHECK TIME = {0}'.format( + check_wrapper_jobs_sleeptime)) save = False slurm = [] for platform in platforms_to_test: list_jobid = "" completed_joblist = [] list_prevStatus = [] - queuing_jobs = job_list.get_in_queue_grouped_id(platform) + queuing_jobs = job_list.get_in_queue_grouped_id( + platform) for job_id, job in queuing_jobs.items(): # Check Wrappers one-by-one if job_list.job_package_map and job_id in job_list.job_package_map: - Log.debug('Checking wrapper job with id ' + str(job_id)) + Log.debug( + 'Checking wrapper job with id ' + str(job_id)) wrapper_job = job_list.job_package_map[job_id] if as_conf.get_notifications() == 'true': for inner_job in wrapper_job.job_list: - inner_job.prev_status= inner_job.status + inner_job.prev_status = inner_job.status check_wrapper = True if wrapper_job.status == Status.RUNNING: - check_wrapper = True if datetime.timedelta.total_seconds(datetime.datetime.now() - wrapper_job.checked_time) >= check_wrapper_jobs_sleeptime else False + check_wrapper = True if datetime.timedelta.total_seconds(datetime.datetime.now( + ) - wrapper_job.checked_time) >= check_wrapper_jobs_sleeptime else False if check_wrapper: wrapper_job.checked_time = datetime.datetime.now() - platform.check_job(wrapper_job) # This is where wrapper will be checked on the slurm platform, update takes place. + # This is where wrapper will be checked on the slurm platform, update takes place. + platform.check_job(wrapper_job) try: if wrapper_job.status != wrapper_job.new_status: Log.info( - 'Wrapper job ' + wrapper_job.name + ' changed from ' + str(Status.VALUE_TO_KEY[wrapper_job.status]) + ' to status ' + str(Status.VALUE_TO_KEY[wrapper_job.new_status]) ) + 'Wrapper job ' + wrapper_job.name + ' changed from ' + str(Status.VALUE_TO_KEY[wrapper_job.status]) + ' to status ' + str(Status.VALUE_TO_KEY[wrapper_job.new_status])) except: - Log.critical("Status Is UNKNOWN, (NONE) exiting autosubmit") + Log.critical( + "Status Is UNKNOWN, (NONE) exiting autosubmit") exit(1) - wrapper_job.check_status(wrapper_job.new_status) # New status will be saved and inner_jobs will be checked. + # New status will be saved and inner_jobs will be checked. + wrapper_job.check_status( + wrapper_job.new_status) # Erase from packages if the wrapper failed to be queued ( Hold Admin bug ) if wrapper_job.status == Status.WAITING: for inner_job in wrapper_job.job_list: inner_job.packed = False - job_list.job_package_map.pop(job_id, None) - job_list.packages_dict.pop(job_id, None) + job_list.job_package_map.pop( + job_id, None) + job_list.packages_dict.pop( + job_id, None) save = True - #Notifications e-mail + # Notifications e-mail if as_conf.get_notifications() == 'true': for inner_job in wrapper_job.job_list: if inner_job.prev_status != inner_job.status: @@ -1201,16 +1356,16 @@ class Autosubmit: Status.VALUE_TO_KEY[inner_job.prev_status], Status.VALUE_TO_KEY[inner_job.status], as_conf.get_mails_to()) - else: # Prepare jobs, if slurm check all active jobs at once. + else: # Prepare jobs, if slurm check all active jobs at once. job = job[0] prev_status = job.status if job.status == Status.FAILED: continue - if platform.type == "slurm": # List for add all jobs that will be checked + if platform.type == "slurm": # List for add all jobs that will be checked list_jobid += str(job_id) + ',' list_prevStatus.append(prev_status) completed_joblist.append(job) - else: # If they're not from slurm platform check one-by-one + else: # If they're not from slurm platform check one-by-one platform.check_job(job) if prev_status != job.update_status(as_conf.get_copy_remote_logs() == 'true'): if as_conf.get_notifications() == 'true': @@ -1222,14 +1377,16 @@ class Autosubmit: save = True if platform.type == "slurm" and list_jobid != "": - slurm.append([platform,list_jobid,list_prevStatus,completed_joblist]) - #END Normal jobs + wrappers - #CHECK ALL JOBS at once if they're from slurm ( wrappers non contempled) + slurm.append( + [platform, list_jobid, list_prevStatus, completed_joblist]) + # END Normal jobs + wrappers + # CHECK ALL JOBS at once if they're from slurm ( wrappers non contempled) for platform_jobs in slurm: platform = platform_jobs[0] jobs_to_check = platform_jobs[1] - platform.check_Alljobs(platform_jobs[3],jobs_to_check,as_conf.get_copy_remote_logs()) - for j_Indx in xrange(0,len(platform_jobs[3])): + platform.check_Alljobs( + platform_jobs[3], jobs_to_check, as_conf.get_copy_remote_logs()) + for j_Indx in xrange(0, len(platform_jobs[3])): prev_status = platform_jobs[2][j_Indx] job = platform_jobs[3][j_Indx] if prev_status != job.update_status(as_conf.get_copy_remote_logs() == 'true'): @@ -1240,13 +1397,15 @@ class Autosubmit: Status.VALUE_TO_KEY[job.status], as_conf.get_mails_to()) save = True - #End Check Current jobs + # End Check Current jobs save2 = job_list.update_list(as_conf) if save or save2: job_list.save() - Autosubmit.submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence,hold=False) + Autosubmit.submit_ready_jobs( + as_conf, job_list, platforms_to_test, packages_persistence, hold=False) if as_conf.get_remote_dependencies() and len(job_list.get_ready(hold=True)) > 0: - Autosubmit.submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence,hold=True) + Autosubmit.submit_ready_jobs( + as_conf, job_list, platforms_to_test, packages_persistence, hold=True) save = job_list.update_list(as_conf) if save: job_list.save() @@ -1257,11 +1416,11 @@ class Autosubmit: time.sleep(safetysleeptime) Log.info("No more jobs to run.") - timeout=0 + timeout = 0 for platform in platforms_to_test: platform.closeConnection() active_threads = True - all_threads=threading.enumerate() + all_threads = threading.enumerate() while active_threads and timeout < 360: active_threads = False threads_active = 0 @@ -1269,10 +1428,10 @@ class Autosubmit: if "Thread-" in thread.name: if thread.isAlive(): active_threads = True - threads_active= threads_active+1 - Log.warning("{0}",threads_active) + threads_active = threads_active+1 + Log.warning("{0}", threads_active) sleep(10) - timeout=10+timeout + timeout = 10+timeout if len(job_list.get_failed()) > 0: Log.info("Some jobs have failed and reached maximum retrials") return False @@ -1288,7 +1447,7 @@ class Autosubmit: @staticmethod def submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence, inspect=False, - only_wrappers=False,hold=False): + only_wrappers=False, hold=False): """ Gets READY jobs and send them to the platforms if there is available space on the queues @@ -1309,8 +1468,10 @@ class Autosubmit: """ save = False for platform in platforms_to_test: - Log.debug("\nJobs ready for {1}: {0}", len(job_list.get_ready(platform, hold=hold)), platform.name) - packages_to_submit = JobPackager(as_conf, platform, job_list, hold=hold).build_packages() + Log.debug("\nJobs ready for {1}: {0}", len( + job_list.get_ready(platform, hold=hold)), platform.name) + packages_to_submit = JobPackager( + as_conf, platform, job_list, hold=hold).build_packages() if not inspect: platform.open_submit_script() @@ -1325,7 +1486,8 @@ class Autosubmit: # If called from RUN or inspect command if not only_wrappers: try: - package.submit(as_conf, job_list.parameters, inspect, hold=hold) + package.submit( + as_conf, job_list.parameters, inspect, hold=hold) valid_packages_to_submit.append(package) except (IOError, OSError): continue @@ -1340,13 +1502,16 @@ class Autosubmit: if isinstance(package, JobPackageThread): # If it is instance of JobPackageThread, then it is JobPackageVertical. - packages_persistence.save(package.name, package.jobs, package._expid, inspect) + packages_persistence.save( + package.name, package.jobs, package._expid, inspect) save = True except WrongTemplateException as e: - Log.error("Invalid parameter substitution in {0} template", e.job_name) + Log.error( + "Invalid parameter substitution in {0} template", e.job_name) raise except Exception: - Log.error("{0} submission failed due to Unknown error", platform.name) + Log.error( + "{0} submission failed due to Unknown error", platform.name) raise if platform.type == "slurm" and not inspect and not only_wrappers: @@ -1355,7 +1520,8 @@ class Autosubmit: if len(valid_packages_to_submit) > 0: jobs_id = platform.submit_Script(hold=hold) if jobs_id is None: - raise BaseException("Exiting AS, AS is unable to get jobID this can be due a failure on the platform or a bad parameter on job.conf(check that queue parameter is valid for your current platform(CNS,BSC32,PRACE...)") + raise BaseException( + "Exiting AS, AS is unable to get jobID this can be due a failure on the platform or a bad parameter on job.conf(check that queue parameter is valid for your current platform(CNS,BSC32,PRACE...)") i = 0 for package in valid_packages_to_submit: for job in package.jobs: @@ -1373,11 +1539,13 @@ class Autosubmit: job_list.job_package_map[package.jobs[0].id] = wrapper_job if isinstance(package, JobPackageThread): # Saving only when it is a real multi job package - packages_persistence.save(package.name, package.jobs, package._expid, inspect) + packages_persistence.save( + package.name, package.jobs, package._expid, inspect) i += 1 save = True except WrongTemplateException as e: - Log.error("Invalid parameter substitution in {0} template", e.job_name) + Log.error( + "Invalid parameter substitution in {0} template", e.job_name) raise except Exception: Log.error("{0} submission failed", platform.name) @@ -1410,13 +1578,14 @@ class Autosubmit: exp_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid) - if not os.path.exists(exp_path): - Log.critical("The directory %s is needed and does not exist." % exp_path) + Log.critical( + "The directory %s is needed and does not exist." % exp_path) Log.warning("Does an experiment with the given id exist?") return 1 - Log.set_file(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, BasicConfig.LOCAL_TMP_DIR,BasicConfig.LOCAL_ASLOG_DIR, 'monitor.log')) + Log.set_file(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, + BasicConfig.LOCAL_TMP_DIR, BasicConfig.LOCAL_ASLOG_DIR, 'monitor.log')) Log.info("Getting job list...") as_conf = AutosubmitConfig(expid, BasicConfig, ConfigParserFactory()) @@ -1428,10 +1597,10 @@ class Autosubmit: pkl_dir = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, 'pkl') - job_list = Autosubmit.load_job_list(expid, as_conf, notransitive=notransitive,monitor=True) + job_list = Autosubmit.load_job_list( + expid, as_conf, notransitive=notransitive, monitor=True) Log.debug("Job list restored from {0} files", pkl_dir) - if not isinstance(job_list, type([])): jobs = [] if filter_chunks: @@ -1445,15 +1614,19 @@ class Autosubmit: data = json.loads(Autosubmit._create_json(fc)) for date_json in data['sds']: date = date_json['sd'] - jobs_date = filter(lambda j: date2str(j.date) == date, job_list.get_job_list()) + jobs_date = filter(lambda j: date2str( + j.date) == date, job_list.get_job_list()) for member_json in date_json['ms']: member = member_json['m'] - jobs_member = filter(lambda j: j.member == member, jobs_date) + jobs_member = filter( + lambda j: j.member == member, jobs_date) for chunk_json in member_json['cs']: chunk = int(chunk_json) - jobs = jobs + [job for job in filter(lambda j: j.chunk == chunk, jobs_member)] + jobs = jobs + \ + [job for job in filter( + lambda j: j.chunk == chunk, jobs_member)] elif filter_status: Log.debug("Filtering jobs with status {0}", filter_status) @@ -1461,7 +1634,8 @@ class Autosubmit: jobs = job_list.get_job_list() else: fs = Autosubmit._get_status(filter_status) - jobs = [job for job in filter(lambda j: j.status == fs, job_list.get_job_list())] + jobs = [job for job in filter( + lambda j: j.status == fs, job_list.get_job_list())] elif filter_section: ft = filter_section @@ -1486,9 +1660,6 @@ class Autosubmit: else: jobs = job_list.get_job_list() - - - referenced_jobs_to_remove = set() for job in jobs: for child in job.children: @@ -1504,20 +1675,22 @@ class Autosubmit: # for job in jobs: # print(job.name + " from " + str(job.platform_name)) # return False - #WRAPPERS + # WRAPPERS if as_conf.get_wrapper_type() != 'none' and check_wrapper: # Class constructor creates table if it does not exist packages_persistence = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid) - # Permissons - os.chmod(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl", "job_packages_" + expid + ".db"), 0644) - #Database modification + # Permissons + os.chmod(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, + "pkl", "job_packages_" + expid + ".db"), 0o644) + # Database modification packages_persistence.reset_table(True) referenced_jobs_to_remove = set() job_list_wrappers = copy.deepcopy(job_list) jobs_wr_aux = copy.deepcopy(jobs) jobs_wr = [] - [jobs_wr.append(job) for job in jobs_wr_aux if (job.status == Status.READY or job.status == Status.WAITING)] + [jobs_wr.append(job) for job in jobs_wr_aux if ( + job.status == Status.READY or job.status == Status.WAITING)] for job in jobs_wr: for child in job.children: if child not in jobs_wr: @@ -1533,14 +1706,12 @@ class Autosubmit: packages_persistence, True) packages = packages_persistence.load(True) - packages+= JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), - "job_packages_" + expid).load() + packages += JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), + "job_packages_" + expid).load() else: packages = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid).load() - - groups_dict = dict() if group_by: status = list() @@ -1548,36 +1719,40 @@ class Autosubmit: for s in expand_status.split(): status.append(Autosubmit._get_status(s.upper())) - job_grouping = JobGrouping(group_by, copy.deepcopy(jobs), job_list, expand_list=expand, expanded_status=status) + job_grouping = JobGrouping(group_by, copy.deepcopy( + jobs), job_list, expand_list=expand, expanded_status=status) groups_dict = job_grouping.group_jobs() monitor_exp = Monitor() if txt_only or txt_logfiles: - monitor_exp.generate_output_txt(expid, jobs, os.path.join(exp_path,"/tmp/LOG_"+expid),txt_logfiles, job_list_object=job_list) + monitor_exp.generate_output_txt(expid, jobs, os.path.join( + exp_path, "/tmp/LOG_"+expid), txt_logfiles, job_list_object=job_list) else: # if file_format is set, use file_format, otherwise use conf value - monitor_exp.generate_output(expid, - jobs, - os.path.join(exp_path, "/tmp/LOG_", expid), - output_format= file_format if file_format is not None else output_type, - packages=packages, - show=not hide, - groups=groups_dict, + monitor_exp.generate_output(expid, + jobs, + os.path.join( + exp_path, "/tmp/LOG_", expid), + output_format=file_format if file_format is not None else output_type, + packages=packages, + show=not hide, + groups=groups_dict, hide_groups=hide_groups, job_list_object=job_list) - + if detail: - current_length = len(job_list.get_job_list()) + current_length = len(job_list.get_job_list()) if current_length > 1000: - Log.warning("-d option: Experiment has too many jobs to be printed in the terminal. Maximum job quantity is 1000, your experiment has " + str(current_length) + " jobs.") + Log.warning( + "-d option: Experiment has too many jobs to be printed in the terminal. Maximum job quantity is 1000, your experiment has " + str(current_length) + " jobs.") else: Log.info(job_list.print_with_status()) return True @staticmethod - def statistics(expid, filter_type, filter_period, file_format, hide,notransitive=False): + def statistics(expid, filter_type, filter_period, file_format, hide, notransitive=False): """ Plots statistics graph for a given experiment. Plot is created in experiment's plot folder with name __