diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 8122146d550e191aada90792fe4d22993cd4c406..6fc58eb58deecb4a19896d1259e964b61c1e391e 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -2795,357 +2795,353 @@ class Autosubmit: 7000, str(e)) return True - @staticmethod - def migrate(experiment_id, offer, pickup, only_remote): + def get_platforms_grouped_by_dir(platforms): """ - Migrates experiment files from current to other user. - It takes mapping information for new user from config files. - - :param experiment_id: experiment identifier: - :param pickup: - :param offer: - :param only_remote: + Groups the platforms by the directory where the data is stored + :param platforms: list of platforms + :return: dictionary with the directory as key and a list of platforms as value """ - - if offer: - as_conf = AutosubmitConfig( - experiment_id, BasicConfig, YAMLParserFactory()) - as_conf.check_conf_files(True) - pkl_dir = os.path.join( - BasicConfig.LOCAL_ROOT_DIR, experiment_id, 'pkl') - job_list = Autosubmit.load_job_list( - experiment_id, as_conf, notransitive=True, monitor=True) - Log.debug("Job list restored from {0} files", pkl_dir) - error = False - platforms_to_test = set() - submitter = Autosubmit._get_submitter(as_conf) - submitter.load_platforms(as_conf) - if submitter.platforms is None: - raise AutosubmitCritical("No platforms configured!!!", 7014) - platforms = submitter.platforms - for job in job_list.get_job_list(): - job.submitter = submitter - if job.platform_name is None: - job.platform_name = as_conf.get_platform() - platforms_to_test.add(platforms[job.platform_name]) - # establish the connection to all platforms on use - Autosubmit.restore_platforms(platforms_to_test) - Log.info('Migrating experiment {0}'.format(experiment_id)) - Autosubmit._check_ownership(experiment_id, raise_error=True) - if submitter.platforms is None: - return False - Log.info("Checking remote platforms") - platforms = [x for x in submitter.platforms if x not in [ - 'local', 'LOCAL']] - already_moved = set() - backup_files = [] - backup_conf = [] - error = False - err_message = 'Invalid Configuration:' - for platform in platforms: - # Checks - Log.info( - "Checking [{0}] from platforms configuration...", platform) - if as_conf.get_migrate_user_to(platform) == '': - err_message += "\nInvalid USER_TO target [ USER == USER_TO in [{0}] ]".format( - platform) - error = True - elif not as_conf.get_migrate_duplicate(platform) and as_conf.get_migrate_user_to( - platform) == as_conf.get_current_user(platform): - err_message += "\nInvalid USER_TO target [ USER == USER_TO in ({0}) ] while parameter SAME_USER is false (or unset)".format( - platform) - error = True - p = submitter.platforms[platform] - if p.temp_dir is None: - err_message += "\nInvalid TEMP_DIR, Parameter must be present even if empty in [{0}]".format( - platform) - error = True - elif p.temp_dir != "": - if not p.check_tmp_exists(): - err_message += "\nTEMP_DIR {0}, does not exists in [{1}]".format( - p.temp_dir, platform) - error = True - if error: - raise AutosubmitCritical(err_message, 7014) - for platform in platforms: - if as_conf.get_migrate_project_to(platform) != '': - Log.info("Project in platform configuration file successfully updated to {0}", - as_conf.get_current_project(platform)) - as_conf.get_current_project(platform) - backup_conf.append([platform, as_conf.get_current_user( - platform), as_conf.get_current_project(platform)]) - as_conf.set_new_user( - platform, as_conf.get_migrate_user_to(platform)) - - as_conf.set_new_project( - platform, as_conf.get_migrate_project_to(platform)) - as_conf.get_current_project(platform) - as_conf.get_current_user(platform) - else: - Log.result( - "[OPTIONAL] PROJECT_TO directive not found. The directive PROJECT will remain unchanged") - backup_conf.append( - [platform, as_conf.get_current_user(platform), None]) - as_conf.set_new_user( - platform, as_conf.get_migrate_user_to(platform)) - as_conf.get_current_project(platform) - as_conf.get_current_user(platform) - - if as_conf.get_migrate_host_to(platform) != "none" and len(as_conf.get_migrate_host_to(platform)) > 0: - Log.result( - "Host in platform configuration file successfully updated to {0}", - as_conf.get_migrate_host_to(platform)) - as_conf.set_new_host( - platform, as_conf.get_migrate_host_to(platform)) + platforms_by_dir = defaultdict(list) + for platform in platforms: + platforms_by_dir[platform.root_dir].append(platform) + return platforms_by_dir + @staticmethod + def check_migrate_config(as_conf,platforms,new_platform_data): + """ + Checks if the configuration file has the necessary information to migrate the data + :param as_conf: Autosubmit configuration file + :param platforms: list of platforms + :return: platforms to migrate + """ + error = False + platforms_to_migrate = list() + platforms_with_missconfiguration = "" + platforms_by_dir = Autosubmit.get_platforms_grouped_by_dir(platforms) + for platform_dir,platforms_list in platforms_by_dir.items(): + platform_dir_error = True + for platform in platforms_list: + if platform.name.upper() == "LOCAL": + platform_dir_error = False + continue + Log.info(f"Checking [{platform.name}] from platforms configuration...") + if as_conf.platforms_data[platform.name].get("USER", None) == new_platform_data[platform.name].get("USER", None) and not as_conf.platforms_data[platform.name].get("SAME_USER",False): + Log.debug(f"Values: USER: {as_conf.platforms_data[platform.name].get('USER',None)}\n" + f" USER_TO: {new_platform_data[platform.name].get('USER',None)}\n" + f" PROJECT: {as_conf.platforms_data[platform.name].get('PROJECT',None)}\n" + f" PROJECT_TO: {new_platform_data[platform.name].get('PROJECT',None)}\n" + f" TEMP_DIR: {as_conf.platforms_data[platform.name].get('TEMP_DIR', '')}\n") + Log.debug(f"Invalid configuration for platform [{platform.name}]\nTrying next platform...") else: - Log.result( - "[OPTIONAL] HOST_TO directive not found. The directive HOST will remain unchanged") - p = submitter.platforms[platform] - if p.temp_dir not in already_moved: - if p.root_dir != p.temp_dir and len(p.temp_dir) > 0: - already_moved.add(p.temp_dir) - # find /home/bsc32/bsc32070/dummy3 -type l -lname '/*' -printf ' ln -sf "$(realpath -s --relative-to="%p" $(readlink "%p")")" \n' > script.sh - # command = "find " + p.root_dir + " -type l -lname \'/*\' -printf 'var=\"$(realpath -s --relative-to=\"%p\" \"$(readlink \"%p\")\")\" && var=${var:3} && ln -sf $var \"%p\" \\n'" - Log.info( - "Converting the absolute symlinks into relatives on platform {0} ", platform) - command = "find " + p.root_dir + \ - " -type l -lname \'/*\' -printf 'var=\"$(realpath -s --relative-to=\"%p\" \"$(readlink \"%p\")\")\" && var=${var:3} && ln -sf $var \"%p\" \\n' " - try: - p.send_command(command, True) - if p.get_ssh_output().startswith("var="): - convertLinkPath = os.path.join( - BasicConfig.LOCAL_ROOT_DIR, experiment_id, BasicConfig.LOCAL_TMP_DIR, - 'convertLink.sh') - with open(convertLinkPath, 'w') as convertLinkFile: - convertLinkFile.write(p.get_ssh_output()) - p.send_file("convertLink.sh") - convertLinkPathRemote = os.path.join( - p.remote_log_dir, "convertLink.sh") - command = "chmod +x " + convertLinkPathRemote + " && " + \ - convertLinkPathRemote + " && rm " + convertLinkPathRemote - p.send_command(command, True) - else: - Log.result("No links found in {0} for [{1}] ".format( - p.root_dir, platform)) + Log.info("Valid configuration for platform [{0}]".format(platform.name)) + Log.result(f"Using platform: [{platform.name}] to migrate [{platform.root_dir}] data") + platform_dir_error = False + platforms_to_migrate.append(platform) + break + if platform_dir_error: + error = True + platform_names = [p.name+", " for p in platforms_by_dir[platform_dir]] + platform_names = platform_names[:-2] + platforms_with_missconfiguration += f"{platform_dir}: {platform_names}\n" + if error: + raise AutosubmitCritical(f"Invalid migrate configuration for platforms: {platforms_with_missconfiguration} ", 7014) + else: + return platforms_to_migrate - except IOError: - Log.debug( - "The platform {0} does not contain absolute symlinks", platform) - except BaseException: - Log.printlog( - "Absolute symlinks failed to convert, check user in platform.yml", 3000) - error = True - break - try: - Log.info( - "Moving remote files/dirs on {0}", platform) - p.send_command("chmod 777 -R " + p.root_dir) - if not p.move_file(p.root_dir, os.path.join(p.temp_dir, experiment_id), False): - Log.result("No data found in {0} for [{1}]\n".format( - p.root_dir, platform)) - except IOError as e: - Log.printlog("The files/dirs on {0} cannot be moved to {1}.".format(p.root_dir, - os.path.join(p.temp_dir, - experiment_id), - 6012)) - error = True - break - except Exception as e: - Log.printlog("Trace: {2}\nThe files/dirs on {0} cannot be moved to {1}.".format( - p.root_dir, os.path.join(p.temp_dir, experiment_id), str(e)), 6012) - error = True - break - backup_files.append(platform) - Log.result( - "Files/dirs on {0} have been successfully offered", platform) - if error: - as_conf = AutosubmitConfig( - experiment_id, BasicConfig, YAMLParserFactory()) - as_conf.check_conf_files(False) - for platform in backup_files: - p = submitter.platforms[platform] - p.move_file(os.path.join( - p.temp_dir, experiment_id), p.root_dir, True) - for platform in backup_conf: - as_conf.set_new_user(platform[0], platform[1]) - if platform[2] is not None and len(str(platform[2])) > 0: - as_conf.set_new_project(platform[0], platform[2]) - if as_conf.get_migrate_host_to(platform[0]) != "none" and len( - as_conf.get_migrate_host_to(platform[0])) > 0: - as_conf.set_new_host( - platform[0], as_conf.get_migrate_host_to(platform[0])) - raise AutosubmitCritical( - "The experiment cannot be offered, changes are reverted", 7014) - else: + @staticmethod + def migrate_offer_local(experiment_id): + try: + if not Autosubmit.archive(experiment_id, True, True): + raise AutosubmitCritical(f"Error archiving the experiment", 7014) + Log.result("The experiment has been successfully offered.") + except Exception as e: + # todo put the IO error code + raise AutosubmitCritical(f"[LOCAL] Error offering the experiment: {str(e)}\n" + f"Please, try again", 7000) + @staticmethod + def get_migrate_info(as_conf): + migrate_file = as_conf.experiment_data.get("AS_MIGRATE", None) + if migrate_file is None: + raise AutosubmitCritical( + "No migrate information found\nPlease add a key named AS_MIGRATE with the path to the file", 7014) + + # expand home if needed + migrate_file = Path(os.path.expanduser(migrate_file)) + # If does not exist, raise error + if not migrate_file.exists(): + raise AutosubmitCritical(f"File {migrate_file} does not exist", 7014) + # Merge platform keys with migrate keys that should be the old credentials + # Migrate file consist of: + # platform_name: must match the platform name in the platforms configuration file, must have the old user + # USER: user + # PROJECT: project + # Host ( optional ) : host of the machine if using alias + # TEMP_DIR: temp dir for current platform, because can be different for each of them + return as_conf.load_config_file(as_conf.experiment_data, migrate_file) + @staticmethod + def migrate_offer_remote(experiment_id): + # Init the configuration + as_conf = AutosubmitConfig(experiment_id, BasicConfig, YAMLParserFactory()) + as_conf.check_conf_files(False) + # Load migrate + #Find migrate file + new_platform_data = copy.deepcopy(as_conf.platforms_data) + migrate_file = as_conf.experiment_data.get("AS_MIGRATE", None) + if migrate_file is None: + raise AutosubmitCritical("No migrate information found\nPlease add a key named AS_MIGRATE with the path to the file", 7014) + # expand home if needed + migrate_file = Path(os.path.expanduser(migrate_file)) + # If does not exist, raise error + if not migrate_file.exists(): + raise AutosubmitCritical(f"File {migrate_file} does not exist", 7014) + # Merge platform keys with migrate keys that should be the old credentials + # Migrate file consist of: + # platform_name: must match the platform name in the platforms configuration file, must have the old user + # USER: user + # PROJECT: project + # Host ( optional ) : host of the machine if using alias + # TEMP_DIR: temp dir for current platform, because can be different for each of them + as_conf.experiment_data = Autosubmit.get_migrate_info(as_conf,migrate_file) + pkl_dir = os.path.join(BasicConfig.LOCAL_ROOT_DIR, experiment_id, 'pkl') + job_list = Autosubmit.load_job_list(experiment_id, as_conf, notransitive=True, monitor=True) + Log.debug("Job list restored from {0} files", pkl_dir) + platforms_to_test = set() + submitter = Autosubmit._get_submitter(as_conf) + submitter.load_platforms(as_conf) + if submitter.platforms is None: + raise AutosubmitCritical("No platforms configured!!!", 7014) + platforms = submitter.platforms + for job in job_list.get_job_list(): + job.submitter = submitter + if job.platform_name is None: + job.platform_name = as_conf.get_platform() + platforms_to_test.add(platforms[job.platform_name]) + # establish the connection to all platforms on use + Autosubmit.restore_platforms(platforms_to_test) + Log.info('Migrating experiment {0}'.format(experiment_id)) + Autosubmit._check_ownership(experiment_id, raise_error=True) + if submitter.platforms is None: + return False + Log.info("Checking remote platforms") + #[x for x in submitter.platforms if x not in ['local', 'LOCAL']] + # Checks and annotates the platforms to migrate ( one per directory if they share it ) + platforms_to_migrate = Autosubmit.check_migrate_config(as_conf,platforms_to_test,new_platform_data) + platforms_without_issues = list() + for platform in platforms_to_migrate: + p = submitter.platforms[platform.name] + if p.root_dir != p.temp_dir and len(p.temp_dir) > 0: + Log.info(f"Converting the absolute symlinks into relatives on platform [{platform.name}] ") + command = f"find {p.root_dir} -type l -lname \'/*\' -printf 'var=\"$(realpath -s --relative-to=\"%p\" \"$(readlink \"%p\")\")\" && var=${{var:3}} && ln -sf $var \"%p\" \\n' " try: - if not only_remote: - if not Autosubmit.archive(experiment_id, True, True): - for platform in backup_files: - p = submitter.platforms[platform] - p.move_file(os.path.join( - p.temp_dir, experiment_id), p.root_dir, True) - for platform in backup_conf: - as_conf.set_new_user(platform[0], platform[1]) - if platform[2] is not None and len(str(platform[2])) > 0: - as_conf.set_new_project( - platform[0], platform[2]) - raise AutosubmitCritical( - "The experiment cannot be offered, changes are reverted", 7014) - Log.result("The experiment has been successfully offered.") - except Exception as e: - for platform in backup_files: - p = submitter.platforms[platform] - p.move_file(os.path.join( - p.temp_dir, experiment_id), p.root_dir, True) - for platform in backup_conf: - as_conf.set_new_user(platform[0], platform[1]) - if platform[2] is not None and len(str(platform[2])) > 0: - as_conf.set_new_project(platform[0], platform[2]) - raise AutosubmitCritical( - "The experiment cannot be offered, changes are reverted", 7014, str(e)) - elif pickup: - Log.info('Migrating experiment {0}'.format(experiment_id)) + p.send_command(command, True) + if p.get_ssh_output().startswith("var="): + convertLinkPath = os.path.join( + BasicConfig.LOCAL_ROOT_DIR, experiment_id, BasicConfig.LOCAL_TMP_DIR, + 'convertLink.sh') + with open(convertLinkPath, 'w') as convertLinkFile: + convertLinkFile.write(p.get_ssh_output()) + p.send_file("convertLink.sh") + convertLinkPathRemote = os.path.join( + p.remote_log_dir, "convertLink.sh") + command = "chmod +x " + convertLinkPathRemote + " && " + \ + convertLinkPathRemote + " && rm " + convertLinkPathRemote + p.send_command(command, True) + Log.result(f"Absolute symlinks converted on platform [{platform.name}]") + else: + Log.result(f"No absolute symlinks found in [{p.root_dir}] for platform [{platform.name}]") + except IOError: + Log.result(f"No absolute symlinks found in [{p.root_dir}] for platform [{platform.name}]") + except BaseException as e: + Log.printlog(f"Absolute symlinks failed to convert due to [{str(e)}] on platform [{platform.name}]", + 7014) + break + # If there are no errors in the conversion of the absolute symlinks, then move the files of this platform + try: + Log.info(f"Moving remote files/dirs on platform [{platform.name}] to [{p.temp_dir}]") + p.send_command(f"chmod 777 -R {p.root_dir}") + if not p.move_file(p.root_dir, os.path.join(p.temp_dir, experiment_id), False): + Log.result(f"No data found in [{p.root_dir}] for platform [{platform.name}]") + else: + Log.result( + f"Remote files/dirs on platform [{platform.name}] have been successfully moved to [{p.temp_dir}]") + except BaseException as e: + Log.printlog( + f"Cant move files/dirs on platform [{platform.name}] to [{p.temp_dir}] due to [{str(e)}]", + 6000) + break + platforms_without_issues.append(platform) + Log.result(f"Platform [{platform.name}] has been successfully migrated") + + # At this point, all remote platforms has been migrated. + # TODO set user_to and project_to to the correct values + @staticmethod + def migrate_offer(experiment_id,only_remote): + Autosubmit.migrate_offer_remote(experiment_id) + if not only_remote: + Autosubmit.migrate_offer_local(experiment_id) + + @staticmethod + def migrate_pickup(experiment_id, only_remote): + Log.info(f'Migrating experiment {experiment_id}') + if not only_remote: Log.info("Moving local files/dirs") - if not only_remote: - if not Autosubmit.unarchive(experiment_id, True): + if not Autosubmit.unarchive(experiment_id, True, False): + if not Path(os.path.join(BasicConfig.LOCAL_ROOT_DIR, experiment_id)).exists(): raise AutosubmitCritical( "The experiment cannot be picked up", 7012) - Log.info("Local files/dirs have been successfully picked up") - else: - exp_path = os.path.join( - BasicConfig.LOCAL_ROOT_DIR, experiment_id) - if not os.path.exists(exp_path): - raise AutosubmitCritical( - "Experiment seems to be archived, no action is performed", 7012) - - as_conf = AutosubmitConfig( - experiment_id, BasicConfig, YAMLParserFactory()) - as_conf.check_conf_files(False) - pkl_dir = os.path.join( - BasicConfig.LOCAL_ROOT_DIR, experiment_id, 'pkl') - job_list = Autosubmit.load_job_list( - experiment_id, as_conf, notransitive=True, monitor=True) - Log.debug("Job list restored from {0} files", pkl_dir) - error = False - platforms_to_test = set() - submitter = Autosubmit._get_submitter(as_conf) - submitter.load_platforms(as_conf) - if submitter.platforms is None: - raise AutosubmitCritical("No platforms configured!!!", 7014) - platforms = submitter.platforms - for job in job_list.get_job_list(): + Log.info("Local files/dirs have been successfully picked up") + else: + exp_path = os.path.join( + BasicConfig.LOCAL_ROOT_DIR, experiment_id) + if not os.path.exists(exp_path): + raise AutosubmitCritical( + "Experiment seems to be archived, no action is performed\nHint: Try to pickup without the remote flag", 7012) + as_conf = AutosubmitConfig( + experiment_id, BasicConfig, YAMLParserFactory()) + as_conf.check_conf_files(False) + pkl_dir = os.path.join( + BasicConfig.LOCAL_ROOT_DIR, experiment_id, 'pkl') + job_list = Autosubmit.load_job_list( + experiment_id, as_conf, notransitive=True, monitor=True) + Log.debug("Job list restored from {0} files", pkl_dir) + error = False + platforms_to_test = set() + submitter = Autosubmit._get_submitter(as_conf) + submitter.load_platforms(as_conf) + if submitter.platforms is None: + raise AutosubmitCritical("No platforms configured!!!", 7014) + platforms = submitter.platforms + job_sections_check = set() + for job in job_list.get_job_list(): + if job.section not in job_sections_check: + job_sections_check.add(job.section) job.submitter = submitter if job.platform_name is None: job.platform_name = as_conf.get_platform() platforms_to_test.add(platforms[job.platform_name]) - Log.info("Checking remote platforms") - platforms = [x for x in submitter.platforms if x not in [ - 'local', 'LOCAL']] - already_moved = set() - backup_files = [] - # establish the connection to all platforms on use - try: - Autosubmit.restore_platforms(platforms_to_test) - except AutosubmitCritical as e: - raise AutosubmitCritical( - e.message + "\nInvalid Remote Platform configuration, recover them manually or:\n 1) Configure platform.yml with the correct info\n 2) autosubmit expid -p --onlyremote", - 7014, e.trace) - except Exception as e: - raise AutosubmitCritical( - "Invalid Remote Platform configuration, recover them manually or:\n 1) Configure platform.yml with the correct info\n 2) autosubmit expid -p --onlyremote", - 7014, str(e)) - for platform in platforms: - p = submitter.platforms[platform] - if p.temp_dir is not None and p.temp_dir not in already_moved: - if p.root_dir != p.temp_dir and len(p.temp_dir) > 0: - already_moved.add(p.temp_dir) - Log.info( - "Copying remote files/dirs on {0}", platform) - Log.info("Copying from {0} to {1}", os.path.join( - p.temp_dir, experiment_id), p.root_dir) - finished = False - limit = 150 - rsync_retries = 0 - try: - # Avoid infinite loop unrealistic upper limit, only for rsync failure - while not finished and rsync_retries < limit: - finished = False - pipeline_broke = False - Log.info( - "Rsync launched {0} times. Can take up to 150 retrials or until all data is transferred".format( - rsync_retries + 1)) + Log.info("Checking remote platforms") + platforms = [x for x in submitter.platforms if x not in [ + 'local', 'LOCAL']] + already_moved = set() + backup_files = [] + # establish the connection to all platforms on use + try: + Autosubmit.restore_platforms(platforms_to_test) + except AutosubmitCritical as e: + raise AutosubmitCritical( + e.message + "\nInvalid Remote Platform configuration, recover them manually or:\n 1) Configure platform.yml with the correct info\n 2) autosubmit expid -p --onlyremote", + 7014, e.trace) + except Exception as e: + raise AutosubmitCritical( + "Invalid Remote Platform configuration, recover them manually or:\n 1) Configure platform.yml with the correct info\n 2) autosubmit expid -p --onlyremote", + 7014, str(e)) + for platform in platforms: + p = submitter.platforms[platform] + if p.temp_dir is not None and p.temp_dir not in already_moved: + if p.root_dir != p.temp_dir and len(p.temp_dir) > 0: + already_moved.add(p.temp_dir) + Log.info( + "Copying remote files/dirs on {0}", platform) + Log.info("Copying from {0} to {1}", os.path.join( + p.temp_dir, experiment_id), p.root_dir) + finished = False + limit = 150 + rsync_retries = 0 + try: + # Avoid infinite loop unrealistic upper limit, only for rsync failure + while not finished and rsync_retries < limit: + finished = False + pipeline_broke = False + Log.info( + "Rsync launched {0} times. Can take up to 150 retrials or until all data is transferred".format( + rsync_retries + 1)) + try: + p.send_command( + "rsync --timeout=3600 --bwlimit=20000 -aq --remove-source-files " + os.path.join( + p.temp_dir, experiment_id) + " " + p.root_dir[:-5]) + except BaseException as e: + Log.debug("{0}".format(str(e))) + rsync_retries += 1 try: - p.send_command( - "rsync --timeout=3600 --bwlimit=20000 -aq --remove-source-files " + os.path.join( - p.temp_dir, experiment_id) + " " + p.root_dir[:-5]) - except BaseException as e: - Log.debug("{0}".format(str(e))) - rsync_retries += 1 - try: - if p.get_ssh_output_err() == "": - finished = True - elif p.get_ssh_output_err().lower().find("no such file or directory") == -1: - finished = True - else: - finished = False - except Exception as e: - finished = False - pipeline_broke = True - if not pipeline_broke: - if p.get_ssh_output_err().lower().find("no such file or directory") == -1: + if p.get_ssh_output_err() == "": finished = True - elif p.get_ssh_output_err().lower().find( - "warning: rsync") != -1 or p.get_ssh_output_err().lower().find( - "closed") != -1 or p.get_ssh_output_err().lower().find( - "broken pipe") != -1 or p.get_ssh_output_err().lower().find( - "directory has vanished") != -1: - rsync_retries += 1 - finished = False - elif p.get_ssh_output_err() == "": + elif p.get_ssh_output_err().lower().find("no such file or directory") == -1: finished = True else: - error = True finished = False - break - p.send_command( - "find {0} -depth -type d -empty -delete".format( - os.path.join(p.temp_dir, experiment_id))) - Log.result( - "Empty dirs on {0} have been successfully deleted".format(p.temp_dir)) - if finished: - p.send_command("chmod 755 -R " + p.root_dir) - Log.result( - "Files/dirs on {0} have been successfully picked up", platform) - # p.send_command( - # "find {0} -depth -type d -empty -delete".format(os.path.join(p.temp_dir, experiment_id))) - Log.result( - "Empty dirs on {0} have been successfully deleted".format(p.temp_dir)) - else: - Log.printlog("The files/dirs on {0} cannot be copied to {1}.".format( - os.path.join(p.temp_dir, experiment_id), p.root_dir), 6012) - error = True - break - - except IOError as e: - raise AutosubmitError( - "I/O Issues", 6016, e.message) - except BaseException as e: + except Exception as e: + finished = False + pipeline_broke = True + if not pipeline_broke: + if p.get_ssh_output_err().lower().find("no such file or directory") == -1: + finished = True + elif p.get_ssh_output_err().lower().find( + "warning: rsync") != -1 or p.get_ssh_output_err().lower().find( + "closed") != -1 or p.get_ssh_output_err().lower().find( + "broken pipe") != -1 or p.get_ssh_output_err().lower().find( + "directory has vanished") != -1: + rsync_retries += 1 + finished = False + elif p.get_ssh_output_err() == "": + finished = True + else: + error = True + finished = False + break + p.send_command( + "find {0} -depth -type d -empty -delete".format( + os.path.join(p.temp_dir, experiment_id))) + Log.result( + "Empty dirs on {0} have been successfully deleted".format(p.temp_dir)) + if finished: + p.send_command("chmod 755 -R " + p.root_dir) + Log.result( + "Files/dirs on {0} have been successfully picked up", platform) + # p.send_command( + # "find {0} -depth -type d -empty -delete".format(os.path.join(p.temp_dir, experiment_id))) + Log.result( + "Empty dirs on {0} have been successfully deleted".format(p.temp_dir)) + else: + Log.printlog("The files/dirs on {0} cannot be copied to {1}.".format( + os.path.join(p.temp_dir, experiment_id), p.root_dir), 6012) error = True - Log.printlog("The files/dirs on {0} cannot be copied to {1}.\nTRACE:{2}".format( - os.path.join(p.temp_dir, experiment_id), p.root_dir, str(e)), 6012) break - else: - Log.result( - "Files/dirs on {0} have been successfully picked up", platform) - if error: - raise AutosubmitCritical( - "Unable to pickup all platforms, the non-moved files are on the TEMP_DIR\n You can try again with autosubmit {0} -p --onlyremote".format( - experiment_id), 7012) - else: - Log.result("The experiment has been successfully picked up.") - return True + + except IOError as e: + raise AutosubmitError( + "I/O Issues", 6016, e.message) + except BaseException as e: + error = True + Log.printlog("The files/dirs on {0} cannot be copied to {1}.\nTRACE:{2}".format( + os.path.join(p.temp_dir, experiment_id), p.root_dir, str(e)), 6012) + break + else: + Log.result( + "Files/dirs on {0} have been successfully picked up", platform) + if error: + raise AutosubmitCritical( + "Unable to pickup all platforms, the non-moved files are on the TEMP_DIR\n You can try again with autosubmit {0} -p --onlyremote".format( + experiment_id), 7012) + else: + Log.result("The experiment has been successfully picked up.") + return True + @staticmethod + def migrate(experiment_id, offer, pickup, only_remote): + """ + Migrates experiment files from current to other user. + It takes mapping information for new user from config files. + + :param experiment_id: experiment identifier: + :param pickup: + :param offer: + :param only_remote: + """ + + if offer: + Autosubmit.migrate_offer(experiment_id, only_remote) + elif pickup: + Autosubmit.migrate_pickup(experiment_id, only_remote) @staticmethod def check(experiment_id, notransitive=False): @@ -4261,7 +4257,7 @@ class Autosubmit: return True @staticmethod - def unarchive(experiment_id, uncompressed=True): + def unarchive(experiment_id, uncompressed=True, show_err_log = True): """ Unarchives an experiment: uncompress folder from tar.gz and moves to experiment root folder @@ -4290,7 +4286,8 @@ class Autosubmit: year -= 1 if year == 2000: - Log.error("Experiment {0} is not archived", experiment_id) + if show_err_log: + Log.error("Experiment {0} is not archived", experiment_id) return False Log.info("Experiment located in {0} archive", year) @@ -4304,7 +4301,8 @@ class Autosubmit: tar.close() except Exception as e: shutil.rmtree(exp_folder, ignore_errors=True) - Log.printlog("Can not extract tar file: {0}".format(str(e)), 6012) + if show_err_log: + Log.printlog("Can not extract tar file: {0}".format(str(e)), 6012) return False Log.info("Unpacking finished") diff --git a/requeriments.txt b/requeriments.txt index f612dafd597edd9167ce193e39a33b0250015c11..b97597236887778ad50f34ecc47ade3662a1f0d7 100644 --- a/requeriments.txt +++ b/requeriments.txt @@ -1,4 +1,4 @@ -autosubmitconfigparser==1.0.22 +autosubmitconfigparser==1.0.23 paramiko>=2.9.2 bcrypt>=3.2 PyNaCl>=1.5.0 diff --git a/setup.py b/setup.py index aada781fa7ddff34462a1474034423ef02fb3a90..c37bd9d2e261b5309777fe4d581bbfaf805665b3 100644 --- a/setup.py +++ b/setup.py @@ -39,7 +39,7 @@ setup( url='http://www.bsc.es/projects/earthscience/autosubmit/', download_url='https://earth.bsc.es/wiki/doku.php?id=tools:autosubmit', keywords=['climate', 'weather', 'workflow', 'HPC'], - install_requires=['autosubmitconfigparser==1.0.22','bcrypt>=3.2','packaging>19','six>=1.10.0','configobj>=5.0.6','argparse>=1.4.0','python-dateutil>=2.8.2','matplotlib<3.6','numpy<1.22','py3dotplus>=1.1.0','pyparsing>=3.0.7','paramiko>=2.9.2','mock>=4.0.3','portalocker>=2.3.2','networkx==2.6.3','requests>=2.27.1','bscearth.utils>=0.5.2','cryptography>=36.0.1','setuptools>=60.8.2','xlib>=0.21','pip>=22.0.3','ruamel.yaml','pythondialog','pytest','nose','coverage','PyNaCl>=1.4.0','Pygments'], + install_requires=['autosubmitconfigparser==1.0.23','bcrypt>=3.2','packaging>19','six>=1.10.0','configobj>=5.0.6','argparse>=1.4.0','python-dateutil>=2.8.2','matplotlib<3.6','numpy<1.22','py3dotplus>=1.1.0','pyparsing>=3.0.7','paramiko>=2.9.2','mock>=4.0.3','portalocker>=2.3.2','networkx==2.6.3','requests>=2.27.1','bscearth.utils>=0.5.2','cryptography>=36.0.1','setuptools>=60.8.2','xlib>=0.21','pip>=22.0.3','ruamel.yaml','pythondialog','pytest','nose','coverage','PyNaCl>=1.4.0','Pygments'], classifiers=[ "Programming Language :: Python :: 3.7", "Programming Language :: Python :: 3.9",