diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 2aad78ca82e95203863058a8a5909ef9e298dd56..d019850a56d2bd41a38fedc0de5bb3586654a657 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -3144,15 +3144,14 @@ class Autosubmit: if offer: Autosubmit._check_ownership(experiment_id, raise_error=True) migrate.migrate_offer_remote() - if not only_remote: # Local migrate + if not only_remote: # Local migrate try: if not Autosubmit.archive(experiment_id, True, True): raise AutosubmitCritical(f"Error archiving the experiment", 7014) Log.result("The experiment has been successfully offered.") except Exception as e: - # todo put the IO error code raise AutosubmitCritical(f"[LOCAL] Error offering the experiment: {str(e)}\n" - f"Please, try again", 7000) + f"Please, try again", 7075) migrate.migrate_offer_jobdata() elif pickup: Log.info(f'Pickup experiment {experiment_id}') diff --git a/autosubmit/migrate/migrate.py b/autosubmit/migrate/migrate.py index b5d3a51cf0a3b1b13ab63c203c513e05d47fcace..03881d1931c131c6911455b316316797e70225c9 100644 --- a/autosubmit/migrate/migrate.py +++ b/autosubmit/migrate/migrate.py @@ -3,6 +3,7 @@ import tarfile import time import os +from typing import Any, Dict, List, Set from bscearth.utils.date import Log @@ -10,9 +11,12 @@ from autosubmit.helpers.utils import restore_platforms from autosubmitconfigparser.config.basicconfig import BasicConfig from autosubmitconfigparser.config.configcommon import AutosubmitConfig from autosubmitconfigparser.config.yamlparser import YAMLParserFactory -from log.log import Log, AutosubmitCritical, AutosubmitError +from log.log import Log, AutosubmitCritical from autosubmit.job.job_utils import _get_submitter +from pathlib import Path + + class Migrate: def __init__(self, experiment_id, only_remote): @@ -25,190 +29,156 @@ class Migrate: self.basic_config = BasicConfig() self.basic_config.read() - def migrate_pickup(self): + def migrate_pickup(self) -> bool: + """ + Pickup the experiment by copying remote files and directories to the local root directory. + + This function establishes connections to all platforms in use, copies remote files and directories + to the local root directory, and checks if the experiment can run. + + :raises AutosubmitCritical: If the experiment is archived or if there are issues with remote platform configuration. + :return: True if the experiment is successfully picked up. + """ Log.info(f'Pickup experiment {self.experiment_id}') - exp_path = os.path.join( - self.basic_config.LOCAL_ROOT_DIR, self.experiment_id) - if not os.path.exists(exp_path): + exp_path = Path(self.basic_config.LOCAL_ROOT_DIR) / self.experiment_id + if not exp_path.exists(): raise AutosubmitCritical( - "Experiment seems to be archived, no action is performed\nHint: Try to pickup without the remote flag", 7012) - as_conf = AutosubmitConfig( - self.experiment_id, self.basic_config, YAMLParserFactory()) + "Experiment seems to be archived, no action is performed\nHint: Try to pickup without the remote flag", + 7012) + + as_conf = AutosubmitConfig(self.experiment_id, self.basic_config, YAMLParserFactory()) as_conf.reload() - as_conf.experiment_data["PLATFORMS"] = as_conf.misc_data.get("PLATFORMS",{}) + as_conf.experiment_data["PLATFORMS"] = as_conf.misc_data.get("PLATFORMS", {}) platforms = self.load_platforms_in_use(as_conf) error = False Log.info("Checking remote platforms") already_moved = set() - # establish the connection to all platforms on use + try: restore_platforms(platforms) - except AutosubmitCritical as e: - raise AutosubmitCritical( - e.message + "\nInvalid Remote Platform configuration, recover them manually or:\n 1) Configure platform.yml with the correct info\n 2) autosubmit expid -p --onlyremote", - 7014, e.trace) except Exception as e: - raise AutosubmitCritical( - "Invalid Remote Platform configuration, recover them manually or:\n 1) Configure platform.yml with the correct info\n 2) autosubmit expid -p --onlyremote", - 7014, str(e)) + self._handle_restore_platforms_error(e) + for p in platforms: - if p.temp_dir is not None and p.temp_dir not in already_moved: - if p.root_dir != p.temp_dir and len(p.temp_dir) > 0: + if p.temp_dir and p.temp_dir not in already_moved: + if p.root_dir != p.temp_dir and p.temp_dir: already_moved.add(p.temp_dir) - Log.info( - "Copying remote files/dirs on {0}", p.name) - Log.info("Copying from {0} to {1}", os.path.join( - p.temp_dir, self.experiment_id), p.root_dir) - finished = False - limit = 150 - rsync_retries = 0 - try: - # Avoid infinite loop unrealistic upper limit, only for rsync failure - while not finished and rsync_retries < limit: - finished = False - pipeline_broke = False - Log.info( - "Rsync launched {0} times. Can take up to 150 retrials or until all data is transferred".format( - rsync_retries + 1)) - try: - p.send_command( - "rsync --timeout=3600 --bwlimit=20000 -aq --remove-source-files " + os.path.join( - p.temp_dir, self.experiment_id) + " " + p.root_dir[:-5]) - except BaseException as e: - Log.debug("{0}".format(str(e))) - rsync_retries += 1 - try: - if p.get_ssh_output_err() == "": - finished = True - elif p.get_ssh_output_err().lower().find("no such file or directory") == -1: - finished = True - else: - finished = False - except Exception as e: - finished = False - pipeline_broke = True - if not pipeline_broke: - if p.get_ssh_output_err().lower().find("no such file or directory") == -1: - finished = True - elif p.get_ssh_output_err().lower().find( - "warning: rsync") != -1 or p.get_ssh_output_err().lower().find( - "closed") != -1 or p.get_ssh_output_err().lower().find( - "broken pipe") != -1 or p.get_ssh_output_err().lower().find( - "directory has vanished") != -1: - rsync_retries += 1 - finished = False - elif p.get_ssh_output_err() == "": - finished = True - else: - error = True - finished = False - break - p.send_command( - "find {0} -depth -type d -empty -delete".format( - os.path.join(p.temp_dir, self.experiment_id))) - Log.result( - "Empty dirs on {0} have been successfully deleted".format(p.temp_dir)) - if finished: - p.send_command("chmod 755 -R " + p.root_dir) - Log.result( - "Files/dirs on {0} have been successfully picked up", p.name) - # p.send_command( - # "find {0} -depth -type d -empty -delete".format(os.path.join(p.temp_dir, experiment_id))) - Log.result( - "Empty dirs on {0} have been successfully deleted".format(p.temp_dir)) - else: - Log.printlog("The files/dirs on {0} cannot be copied to {1}.".format( - os.path.join(p.temp_dir, self.experiment_id), p.root_dir), 6012) - error = True - break - - except IOError as e: - raise AutosubmitError( - "I/O Issues", 6016, e.message) - except BaseException as e: + Log.info(f"Copying remote files/dirs on {p.name}") + Log.info(f"Copying from {p.temp_dir}/{self.experiment_id} to {p.root_dir}") + if not p.move_folder_rsync(Path(p.temp_dir) / self.experiment_id, Path(p.root_dir).parent): error = True - Log.printlog("The files/dirs on {0} cannot be copied to {1}.\nTRACE:{2}".format( - os.path.join(p.temp_dir, self.experiment_id), p.root_dir, str(e)), 6012) break else: - Log.result( - "Files/dirs on {0} have been successfully picked up", p.name) + Log.result(f"Files/dirs on {p.name} have been successfully picked up") + if error: raise AutosubmitCritical( - "Unable to pickup all platforms, the non-moved files are on the TEMP_DIR\n You can try again with autosubmit {0} -p --onlyremote".format( - self.experiment_id), 7012) + f"Unable to pickup all platforms, the non-moved files are on the TEMP_DIR\n You can try again with autosubmit {self.experiment_id} -p --onlyremote", + 7012) else: Log.result("The experiment has been successfully picked up.") Log.info("Checking if the experiment can run:") - as_conf = AutosubmitConfig( - self.experiment_id, self.basic_config, YAMLParserFactory()) + as_conf = AutosubmitConfig(self.experiment_id, self.basic_config, YAMLParserFactory()) try: as_conf.check_conf_files(False) restore_platforms(platforms) - except BaseException as e: - Log.warning(f"Before running, configure your platform settings. Remember that the as_misc pickup platforms aren't load outside the migrate") + except Exception as e: + Log.warning( + "Before running, configure your platform settings. Remember that the as_misc pickup platforms aren't load outside the migrate") Log.warning(f"The experiment cannot run, check the configuration files:\n{e}") return True - def check_migrate_config(self, as_conf, platforms_to_test, pickup_data ): + def _handle_restore_platforms_error(self, e: AutosubmitCritical) -> None: + raise AutosubmitCritical( + e.message + "\nInvalid Remote Platform configuration, recover them manually or:\n 1) Configure platform.yml with the correct info\n 2) autosubmit expid -p --onlyremote", + 7014, e.trace) + + def check_migrate_config(self, as_conf: AutosubmitConfig, platforms_to_test: List[Any], + pickup_data: Dict[str, Dict[str, str]]) -> None: """ - Checks if the configuration file has the necessary information to migrate the data - :param as_conf: Autosubmit configuration file - :param platforms_to_test: platforms to test - :param pickup_data: data to migrate + Checks if the configuration file has the necessary information to migrate the data. + :param as_conf: Autosubmit configuration file. + :param platforms_to_test: List of platforms to test. + :param pickup_data: Data to migrate. + :raises AutosubmitCritical: If there are missing or invalid platform configurations. """ - # check if all platforms_to_test are present in the pickup_data - missing_platforms = set() + missing_platforms = [] scratch_dirs = set() - platforms_to_migrate = dict() + platforms_to_migrate = {} + for platform in platforms_to_test: - if platform.name not in pickup_data.keys(): + if platform.name not in pickup_data: if platform.name.upper() != "LOCAL" and platform.scratch not in scratch_dirs: - missing_platforms.add(platform.name) + missing_platforms.append(platform.name) else: pickup_data[platform.name]["ROOTDIR"] = platform.root_dir platforms_to_migrate[platform.name] = pickup_data[platform.name] scratch_dirs.add(pickup_data[platform.name].get("SCRATCH_DIR", "")) + if missing_platforms: - raise AutosubmitCritical(f"Missing platforms in the offer conf: {missing_platforms}", 7014) - missconf_plaforms = "" - for platform_pickup_name, platform_pickup_data in platforms_to_migrate.items(): - if platform_pickup_name.upper() == "LOCAL": + raise AutosubmitCritical(f"Missing platforms in the offer conf: {','.join(missing_platforms)}", 7014) + + missconf_platforms = "" + for platform_name, platform_data in platforms_to_migrate.items(): + if platform_name.upper() == "LOCAL": continue - Log.info(f"Checking [{platform_pickup_name}] from as_misc configuration files...") - valid_user = as_conf.platforms_data[platform_pickup_name].get("USER", None) and platform_pickup_data.get("USER", None) - if valid_user: - if as_conf.platforms_data[platform_pickup_name].get("USER", None) == platform_pickup_data.get("USER", None): - if platform_pickup_data.get("SAME_USER",False): - valid_user = True - else: - valid_user = False - valid_project = as_conf.platforms_data[platform_pickup_name].get("PROJECT", None) and platform_pickup_data.get("PROJECT", None) - scratch_dir = as_conf.platforms_data[platform_pickup_name].get("SCRATCH_DIR", None) and platform_pickup_data.get("SCRATCH_DIR", None) - valid_host = as_conf.platforms_data[platform_pickup_name].get("HOST", None) and platform_pickup_data.get("HOST", None) - valid_tmp_dir = platform_pickup_data.get("TEMP_DIR", False) + Log.info(f"Checking [{platform_name}] from as_misc configuration files...") + valid_user = self._validate_user(as_conf, platform_name, platform_data) + valid_project = self._validate_field(as_conf, platform_name, platform_data, "PROJECT") + scratch_dir = self._validate_field(as_conf, platform_name, platform_data, "SCRATCH_DIR") + valid_host = self._validate_field(as_conf, platform_name, platform_data, "HOST") + valid_tmp_dir = platform_data.get("TEMP_DIR", False) + if not valid_tmp_dir: continue elif not valid_user or not valid_project or not scratch_dir or not valid_host: - Log.printlog(f" Offer USER: {as_conf.platforms_data[platform_pickup_name].get('USER',None)}\n" - f" Pickup USER: {platform_pickup_data.get('USER',None)}\n" - f" Offer PROJECT: {as_conf.platforms_data[platform_pickup_name].get('PROJECT',None)}\n" - f" Pickup PROJECT: {platform_pickup_data.get('PROJECT',None)}\n" - f" Offer SCRATCH_DIR: {as_conf.platforms_data[platform_pickup_name].get('SCRATCH_DIR',None)}\n" - f" Pickup SCRATCH_DIR: {platform_pickup_data.get('SCRATCH_DIR',None)}\n" - f" Shared TEMP_DIR: {platform_pickup_data.get('TEMP_DIR', '')}\n") - Log.printlog(f"Invalid configuration for platform [{platform_pickup_name}]\nTrying next platform...",Log.ERROR) - missconf_plaforms = missconf_plaforms + f', {platform_pickup_name}' + self._log_invalid_config(as_conf, platform_name, platform_data) + missconf_platforms += f', {platform_name}' else: - Log.info("Valid configuration for platform [{0}]".format(platform_pickup_name)) - Log.result(f"Using platform: [{platform_pickup_name}] to migrate [{pickup_data[platform_pickup_name]['ROOTDIR']}] data") - if missconf_plaforms: - raise AutosubmitCritical(f"Invalid migrate configuration for platforms: {missconf_plaforms[2:]}", 7014) + Log.info(f"Valid configuration for platform [{platform_name}]") + Log.result(f"Using platform: [{platform_name}] to migrate [{platform_data['ROOTDIR']}] data") + + if missconf_platforms: + raise AutosubmitCritical(f"Invalid migrate configuration for platforms: {missconf_platforms[2:]}", 7014) + + @staticmethod + def _validate_user(as_conf: AutosubmitConfig, platform_name: str, platform_data: Dict[str, str]) -> bool: + user_offer = as_conf.platforms_data[platform_name].get("USER", None) + user_pickup = platform_data.get("USER", None) + if user_offer and user_pickup: + return user_offer == user_pickup and platform_data.get("SAME_USER", False) + return False + + @staticmethod + def _validate_field(as_conf: AutosubmitConfig, platform_name: str, platform_data: Dict[str, str], + field: str) -> bool: + return as_conf.platforms_data[platform_name].get(field, None) and platform_data.get(field, None) + + def _log_invalid_config(self, as_conf: AutosubmitConfig, platform_name: str, platform_data: Dict[str, str]) -> None: + Log.printlog(f" Offer USER: {as_conf.platforms_data[platform_name].get('USER', None)}\n" + f" Pickup USER: {platform_data.get('USER', None)}\n" + f" Offer PROJECT: {as_conf.platforms_data[platform_name].get('PROJECT', None)}\n" + f" Pickup PROJECT: {platform_data.get('PROJECT', None)}\n" + f" Offer SCRATCH_DIR: {as_conf.platforms_data[platform_name].get('SCRATCH_DIR', None)}\n" + f" Pickup SCRATCH_DIR: {platform_data.get('SCRATCH_DIR', None)}\n" + f" Shared TEMP_DIR: {platform_data.get('TEMP_DIR', '')}\n") + Log.printlog(f"Invalid configuration for platform [{platform_name}]\nTrying next platform...", Log.ERROR) + + @staticmethod + def load_platforms_in_use(as_conf: AutosubmitConfig) -> List[Any]: + """ + Load the platforms in use for the experiment. - def load_platforms_in_use(self, as_conf): + This function retrieves the platforms used in the experiment based on the configuration. + It raises an exception if no platforms are configured. + + :param as_conf: Autosubmit configuration file. + :return: A list of platforms in use, excluding the local platform. + :raises AutosubmitCritical: If no platforms are configured. + """ platforms_to_test = set() submitter = _get_submitter(as_conf) submitter.load_platforms(as_conf) @@ -216,146 +186,169 @@ class Migrate: raise AutosubmitCritical("No platforms configured!!!", 7014) platforms = submitter.platforms for job_data in as_conf.experiment_data["JOBS"].values(): - platforms_to_test.add(platforms[job_data.get("PLATFORM", as_conf.experiment_data.get("DEFAULT", {}).get("HPCARCH", "")).upper()]) - return [ platform for platform in platforms_to_test if platform.name != "local" ] + platform_name = job_data.get("PLATFORM", + as_conf.experiment_data.get("DEFAULT", {}).get("HPCARCH", "")).upper() + platforms_to_test.add(platforms[platform_name]) + return [platform for platform in platforms_to_test if platform.name != "local"] + + def migrate_pickup_jobdata(self) -> None: + """ + Unarchive job data files for the experiment. - def migrate_pickup_jobdata(self): - # Unarchive job_data_{expid}.tar + This function unarchives the job data files (`job_data_{expid}.tar`) into the job data directory. + It checks for the existence of the tar file and extracts its contents if it exists. + + :raises AutosubmitCritical: If the tar file cannot be read. + """ Log.info(f'Unarchiving job_data_{self.experiment_id}.tar') - job_data_dir = f"{self.basic_config.JOBDATA_DIR}/job_data_{self.experiment_id}" - if os.path.exists(os.path.join(self.basic_config.JOBDATA_DIR, f"{self.experiment_id}_jobdata.tar")): + job_data_dir = Path(self.basic_config.JOBDATA_DIR) / f"job_data_{self.experiment_id}" + tar_file_path = Path(self.basic_config.JOBDATA_DIR) / f"{self.experiment_id}_jobdata.tar" + + if tar_file_path.exists(): try: - with tarfile.open(os.path.join(self.basic_config.JOBDATA_DIR, f"{self.experiment_id}_jobdata.tar", 'r')) as tar: + with tarfile.open(tar_file_path, 'r') as tar: tar.extractall(path=job_data_dir) tar.close() - os.remove(os.path.join(self.basic_config.JOBDATA_DIR, f"{self.experiment_id}_jobdata.tar")) + tar_file_path.unlink() except Exception as e: raise AutosubmitCritical("Can not read tar file", 7012, str(e)) - def migrate_offer_jobdata(self): - # archive job_data_{expid}.db and job_data_{expid}.sql + def migrate_offer_jobdata(self) -> bool: + """ + Archive job data files for the experiment. + + This function archives the job data files (`job_data_{expid}.db` and `job_data_{expid}.sql`) + into a tar file named `job_data_{expid}.tar`. It checks for the existence of the job data files + and creates the tar file if they exist. + + :raises AutosubmitCritical: If the tar file cannot be written. + :return: True if the job data is archived successfully. + """ Log.info(f'Archiving job_data_{self.experiment_id}.db and job_data_{self.experiment_id}.sql') - job_data_dir = f"{self.basic_config.JOBDATA_DIR}/job_data_{self.experiment_id}" - # Creating tar file + job_data_dir = Path(self.basic_config.JOBDATA_DIR) / f"job_data_{self.experiment_id}" Log.info("Creating tar file ... ") try: compress_type = "w" - output_filepath = f'{self.experiment_id}_jobdata.tar' - db_exists = os.path.exists(f"{job_data_dir}.db") - sql_exists = os.path.exists(f"{job_data_dir}.sql") - if os.path.exists(os.path.join(self.basic_config.JOBDATA_DIR, output_filepath)) and (db_exists or sql_exists): - os.remove(os.path.join(self.basic_config.JOBDATA_DIR, output_filepath)) + output_filepath = Path(self.basic_config.JOBDATA_DIR) / f'{self.experiment_id}_jobdata.tar' + db_exists = (job_data_dir / f"{self.experiment_id}.db").exists() + sql_exists = (job_data_dir / f"{self.experiment_id}.sql").exists() + if output_filepath.exists() and (db_exists or sql_exists): + output_filepath.unlink() elif db_exists or sql_exists: - with tarfile.open(os.path.join(self.basic_config.JOBDATA_DIR, output_filepath), compress_type) as tar: + with tarfile.open(output_filepath, compress_type) as tar: if db_exists: - tar.add(f"{job_data_dir}.db", arcname=f"{self.experiment_id}.db") + tar.add(job_data_dir / f"{self.experiment_id}.db", arcname=f"{self.experiment_id}.db") if sql_exists: - tar.add(f"{job_data_dir}.sql", arcname=f"{self.experiment_id}.sql") + tar.add(job_data_dir / f"{self.experiment_id}.sql", arcname=f"{self.experiment_id}.sql") tar.close() - os.chmod(os.path.join(self.basic_config.JOBDATA_DIR, output_filepath), 0o775) + output_filepath.chmod(0o775) except Exception as e: raise AutosubmitCritical("Can not write tar file", 7012, str(e)) Log.result("Job data archived successfully") return True - def migrate_offer_remote(self): - exit_with_errors = False - # Init the configuration - as_conf = AutosubmitConfig(self.experiment_id, self.basic_config, YAMLParserFactory()) - as_conf.check_conf_files(False) - # Load migrate - #Find migrate file - pickup_data = as_conf.misc_data.get("PLATFORMS",{}) - if not pickup_data: - raise AutosubmitCritical("No migrate information found", 7014) - - # Merge platform keys with migrate keys that should be the old credentials - # Migrate file consist of: - # platform_name: must match the platform name in the platforms configuration file, must have the old user - # USER: user - # PROJECT: project - # Host ( optional ) : host of the machine if using alias - # TEMP_DIR: temp dir for current platform, because can be different for each of the + def migrate_offer_remote(self) -> None: + """ + Perform the migration of remote platforms for the experiment. + This function initializes the configuration, loads the migration data, checks the configuration, + and performs the migration of remote platforms. It handles the conversion of absolute symlinks + to relative ones and moves the files/directories to the specified temporary directories. + Merge platform keys with migrate keys that should be the old credentials. + Migrate file consists of: + - platform_name: must match the platform name in the platforms configuration file, must have the old user + - USER: user + - PROJECT: project + - Host (optional): host of the machine if using alias + - TEMP_DIR: temp dir for current platform, because it can be different for each of them + :raises AutosubmitCritical: If no migration information is found or if there are issues with the platforms. + """ + as_conf = self._initialize_configuration() + pickup_data = self._load_migrate_data(as_conf) platforms_to_test = self.load_platforms_in_use(as_conf) - Log.info('Migrating experiment {0}'.format(self.experiment_id)) + Log.info(f'Migrating experiment {self.experiment_id}') Log.info("Checking remote platforms") self.check_migrate_config(as_conf, platforms_to_test, pickup_data) - # establish the connection to all platforms on use restore_platforms(platforms_to_test) - platforms_with_issues = list() + platforms_with_issues = self._migrate_platforms(platforms_to_test, pickup_data) + if platforms_with_issues: + raise AutosubmitCritical(f'Platforms with issues: {platforms_with_issues}', 7014) + + def _initialize_configuration(self) -> AutosubmitConfig: + as_conf = AutosubmitConfig(self.experiment_id, self.basic_config, YAMLParserFactory()) + as_conf.check_conf_files(False) + return as_conf + + @staticmethod + def _load_migrate_data(as_conf: AutosubmitConfig) -> Dict[str, Dict[str, str]]: + pickup_data = as_conf.misc_data.get("PLATFORMS", {}) + if not pickup_data: + raise AutosubmitCritical("No migrate information found", 7014) + return pickup_data + + def _migrate_platforms(self, platforms_to_test: List[Any], pickup_data: Dict[str, Dict[str, str]]) -> List[str]: + platforms_with_issues = [] for p in platforms_to_test: if p.temp_dir == "": p.temp_dir = pickup_data.get(p.name, {}).get("TEMP_DIR", "") Log.info(f"Using temp dir: {p.temp_dir}") if p.root_dir != p.temp_dir and len(p.temp_dir) > 0: - try: - Log.info(f"Converting the absolute symlinks into relatives on platform [{p.name}] ") - command = f"cd {p.remote_log_dir} ; find {p.root_dir} -type l -lname '/*' -printf 'var=\"$(realpath -s --relative-to=\"%p\" \"$(readlink \"%p\")\")\" && var=${{var:3}} && ln -sf $var \"%p\" \\n' > convertLink.sh" - try: - p.check_absolute_file_exists(p.temp_dir) - except Exception: - exit_with_errors = True - Log.printlog(f'{p.temp_dir} does not exist on platform [{p.name}]', 7014) - platforms_with_issues.append(p.name) - continue - thread = p.send_command_non_blocking(f"{command} ", True) - # has thread end? - start_time = time.time() - Log.info(f"Waiting for the absolute symlinks conversion to finish on platform [{p.name}]") - while thread.is_alive(): - current_time = time.time() - elapsed_time = current_time - start_time - if elapsed_time >= 10: - Log.info(f"Waiting for the absolute symlinks conversion to finish on platform [{p.name}]") - start_time = time.time() # reset the start time - time.sleep(1) - p.send_command(f"cd {p.remote_log_dir} ; cat convertLink.sh", True) - ssh_output = p.get_ssh_output() - if ssh_output.startswith("var="): - command = f"cd {p.remote_log_dir} ; chmod +x convertLink.sh ; ./convertLink.sh ; rm convertLink.sh" - p.send_command(command, True) - Log.result(f"Absolute symlinks converted on platform [{p.name}]") - else: - Log.result(f"No absolute symlinks found in [{p.root_dir}] for platform [{p.name}]") - except IOError: - Log.result(f"No absolute symlinks found in [{p.root_dir}] for platform [{p.name}]") - except AutosubmitError: - raise - except AutosubmitCritical: - raise - except BaseException as e: - exit_with_errors = True - error = str(e) + "\n" + p.get_ssh_output_err() - Log.printlog(f"Absolute symlinks failed to convert due to [{str(error)}] on platform [{p.name}]", - 7014) + if not self._convert_symlinks(p): platforms_with_issues.append(p.name) + continue + if not self._move_files(p): + platforms_with_issues.append(p.name) + return platforms_with_issues + + @staticmethod + def _convert_symlinks(platform: Any) -> bool: + try: + Log.info(f"Converting the absolute symlinks into relatives on platform [{platform.name}]") + command = f"cd {platform.remote_log_dir} ; find {platform.root_dir} -type l -lname '/*' -printf 'var=\"$(realpath -s --relative-to=\"%p\" \"$(readlink \"%p\")\")\" && var=${{var:3}} && ln -sf $var \"%p\" \\n' > convertLink.sh" + if not platform.check_absolute_file_exists(platform.temp_dir): + Log.printlog(f'{platform.temp_dir} does not exist on platform [{platform.name}]', 7014) + return False + thread = platform.send_command_non_blocking(f"{command} ", True) + start_time = time.time() + while thread.is_alive(): + if time.time() - start_time >= 10: + Log.info(f"Waiting for the absolute symlinks conversion to finish on platform [{platform.name}]") + start_time = time.time() + time.sleep(1) + platform.send_command(f"cd {platform.remote_log_dir} ; cat convertLink.sh", True) + ssh_output = platform.get_ssh_output() + if ssh_output.startswith("var="): + command = f"cd {platform.remote_log_dir} ; chmod +x convertLink.sh ; ./convertLink.sh ; rm convertLink.sh" + platform.send_command(command, True) + Log.result(f"Absolute symlinks converted on platform [{platform.name}]") + else: + Log.result(f"No absolute symlinks found in [{platform.root_dir}] for platform [{platform.name}]") + return True + except Exception as e: + Log.printlog(f"Absolute symlinks failed to convert due to [{str(e)}] on platform [{platform.name}]", 7014) + return False + + def _move_files(self, platform: Any) -> bool: + tmp_experiment_path = Path(platform.temp_dir).joinpath(self.experiment_id) - break - # If there are no errors in the conversion of the absolute symlinks, then move the files of this platform - try: - Log.info(f"Moving remote files/dirs on platform [{p.name}] to [{p.temp_dir}]") - p.send_command(f"chmod 777 -R {p.root_dir}") - p.send_command(f"mkdir -p {p.temp_dir}") - p.send_command(f"chmod 777 -R {p.temp_dir}") - if p.check_absolute_file_exists(os.path.join(p.root_dir, self.experiment_id)): - if p.check_absolute_file_exists(os.path.join(p.temp_dir, self.experiment_id)): - Log.printlog(f"Directory [{os.path.join(p.temp_dir, self.experiment_id)}] already exists. New data won't be moved until you move the old data", 6000) - platforms_with_issues.append(p.name) - break - if not p.move_file(p.root_dir, os.path.join(p.temp_dir, self.experiment_id), False): - Log.result(f"No data found in [{p.root_dir}] for platform [{p.name}]") - else: - Log.result( - f"Remote files/dirs on platform [{p.name}] have been successfully moved to [{p.temp_dir}]") - except BaseException as e: - exit_with_errors = True + try: + Log.info(f"Moving remote files/dirs on platform [{platform.name}] to [{platform.temp_dir}]") + platform.send_command(f"chmod 777 -R {platform.root_dir}") + platform.send_command(f"mkdir -p {platform.temp_dir}") + platform.send_command(f"chmod 777 -R {platform.temp_dir}") + if platform.check_absolute_file_exists(platform.root_dir): + if platform.check_absolute_file_exists(tmp_experiment_path): Log.printlog( - f"Cant move files/dirs on platform [{p.name}] to [{p.temp_dir}] due to [{str(e)}]", + f"Directory [{str(tmp_experiment_path)}] already exists. New data won't be moved until you move the old data", 6000) - platforms_with_issues.append(p.name) - break - Log.result(f"Platform [{p.name}] has been successfully migrated") - if exit_with_errors: - raise AutosubmitCritical(f'Platforms with issues: {platforms_with_issues}', 7014) - + return False + if not platform.move_file(platform.root_dir, tmp_experiment_path, False): + if not platform.move_folder_rsync(platform.root_dir, platform.temp_dir): + Log.result(f"No data found in [{platform.root_dir}] for platform [{platform.name}]") + Log.result( + f"Remote files/dirs on platform [{platform.name}] have been successfully moved to [{str(tmp_experiment_path)}]") + return True + except Exception as e: + Log.printlog( + f"Cant move files/dirs on platform [{platform.name}] to [{str(tmp_experiment_path)}] due to [{str(e)}]", 6000) + return False diff --git a/autosubmit/platforms/ecplatform.py b/autosubmit/platforms/ecplatform.py index dd5954a6f6c17488dfe18d8e8956f0661ab2da75..54a20c8fd2d17bff47f04dec50aa151fe479b75c 100644 --- a/autosubmit/platforms/ecplatform.py +++ b/autosubmit/platforms/ecplatform.py @@ -263,6 +263,8 @@ class EcPlatform(ParamikoPlatform): return True def move_file(self, src, dest, must_exist = False): + src = str(src) + dest = str(dest) command = "ecaccess-file-move {0}:{1} {0}:{2}".format(self.host,os.path.join(self.remote_log_dir,src) , os.path.join(self.remote_log_dir,dest)) try: retries = 0 diff --git a/autosubmit/platforms/locplatform.py b/autosubmit/platforms/locplatform.py index 3d9296705d5731faf34b7d053791dc880018321e..bb984b488582620ea5400db99185c367d4d161cf 100644 --- a/autosubmit/platforms/locplatform.py +++ b/autosubmit/platforms/locplatform.py @@ -243,6 +243,8 @@ class LocalPlatform(ParamikoPlatform): :param must_exist: ignore if file exist or not :type dest: str """ + src = str(src) + dest = str(dest) path_root = "" try: path_root = self.get_files_path() diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index b6b4cdeaafccdd87ad4f9d28e8ca4b9d2d5b2ce3..8b64b1ea72c82556804f5f3ecc456beb60d97dab 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -483,6 +483,8 @@ class ParamikoPlatform(Platform): :param must_exist: ignore if file exist or not :type dest: str """ + src = str(src) + dest = str(dest) path_root="" try: path_root = self.get_files_path() @@ -515,6 +517,61 @@ class ParamikoPlatform(Platform): os.path.join(self.get_files_path(), src)), 5001) return False + def move_folder_rsync(self, src: str, dest: str, retries_limit: int = 25) -> bool: + """ + Perform a remote rsync operation with retries. + + :param src: Source directory to sync from. + :param dest: Destination directory to sync to. + :param retries_limit: Maximum number of retries to perform. If it keeps failing, user can prompt the command again. + :return: True if the rsync operation is successful, False otherwise. + """ + finished = False + rsync_retries = 0 + if not Path(src).parent: + src = Path(f"{self.get_files_path()}/{src}") + + while not finished and rsync_retries < retries_limit: + Log.info( + f"Rsync launched {rsync_retries + 1} times. Can take up to 150 retrials or until all data is transferred") + finished = self._attempt_rsync(src, dest) + if not finished: + rsync_retries += 1 + self._handle_rsync_failure(src) + + if not finished or rsync_retries >= retries_limit: + Log.error(f"Rsync operation failed after {rsync_retries} retries") + return finished + + def _attempt_rsync(self, src: str, dest: str) -> bool: + """ + Attempt to perform rsync and check for errors. + + :param src: Source directory to sync from. + :param dest: Destination directory to sync to. + :return: True if rsync is successful, False otherwise. + """ + try: + self.send_command(f"rsync --timeout=3600 --bwlimit=20000 -aqz --remove-source-files {src} {dest}") + self.send_command(f"find {src} -type d -empty -delete") + if self.get_ssh_output_err() == "" or "no such file or directory" not in self.get_ssh_output_err().lower(): + return True + except BaseException as e: + Log.debug(f"{str(e)}") + return False + + def _handle_rsync_failure(self, src: str): + """ + Handle rsync failure by checking for specific error messages and cleaning up. + + :param src: Source directory to sync from. + """ + if self.get_ssh_output_err() == "" or any(keyword in self.get_ssh_output_err().lower() for keyword in + ["warning: rsync", "closed", "broken pipe", "directory has vanished"]): + return + self.send_command(f"find {src} -depth -type d -empty -delete") + Log.result(f"Empty dirs on {src} have been successfully deleted") + def submit_job(self, job, script_name, hold=False, export="none"): """ Submit a job from a given job object. @@ -1434,12 +1491,13 @@ class ParamikoPlatform(Platform): def check_absolute_file_exists(self, src): try: - if self._ftpChannel.stat(src): + if self._ftpChannel.stat(str(src)): return True else: return False except Exception: return False + class ParamikoPlatformException(Exception): """ Exception raised from HPC queues diff --git a/docs/source/troubleshooting/error-codes.rst b/docs/source/troubleshooting/error-codes.rst index 5d15cc3e77ee8a1214e20ae99c289c2641255786..e5cb5ec008378593368d16f4a12812377a0d81d3 100644 --- a/docs/source/troubleshooting/error-codes.rst +++ b/docs/source/troubleshooting/error-codes.rst @@ -208,6 +208,8 @@ team in Git. +------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | 7074 | Profiling process failed | You can find more detailed information in the logs, as well as hints to solve the problem | +------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| 7075 | I/O error | Check that your filesystem is not full or has any other issue. | ++------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ .. note:: Please submit an issue to the Autosubmit team if you have not found your error diff --git a/docs/source/userguide/manage/index.rst b/docs/source/userguide/manage/index.rst index f1607a12ed1a21686bc00c3943e5024cdb74252f..83ea6998f5cc8ddfe4ad49e739b9552677dd6657 100644 --- a/docs/source/userguide/manage/index.rst +++ b/docs/source/userguide/manage/index.rst @@ -222,6 +222,12 @@ Migrate file example: $expid/conf/migrate.yml Example for a RES account to BSC account the tmp folder must have rwx|rwx|--- permissions. The temporary directory must be in the same filesystem. +.. warning:: Autosubmit will use rsync to copy the data during the offer as a last resort. + +.. warning:: Autosubmit will use rsync to transfer the files' ownership from the former remote platform to the newer one during the pickup. + +When rsync is activated, it will try to move the data up to 25 tries afterward, if some data is still not moved, you can perform ``autosubmit migrate -o or -p $expid`` again. + User A, To offer the experiment: :: diff --git a/test/unit/test_migrate.py b/test/unit/test_migrate.py index 92f49b5ec4c381af775f771c4fea63d00e057879..8cbe1dadc6913bbf34e5e4b6564dda66fea11acb 100644 --- a/test/unit/test_migrate.py +++ b/test/unit/test_migrate.py @@ -11,13 +11,14 @@ from log.log import AutosubmitCritical from test.unit.utils.common import create_database, init_expid - -@pytest.mark.skip('This test requires a running SSH server, with password-less authentication') +# TODO: Write the tests without the class and self ( to do after the transition to github) +# TODO: Isolate the tests ( to do after the transition to github) rsync one works if only it is launched +@pytest.mark.skip("Pipeline needs to allow to connect to itself through ssh") class TestMigrate: @pytest.fixture(scope='class') def migrate_tmpdir(self, tmpdir_factory): - folder = tmpdir_factory.mktemp(f'migrate_tests') + folder = tmpdir_factory.mktemp('migrate_tests') os.mkdir(folder.join('scratch')) os.mkdir(folder.join('migrate_tmp_dir')) file_stat = os.stat(f"{folder.strpath}") @@ -54,7 +55,7 @@ path = {folder} os.environ['AUTOSUBMIT_CONFIGURATION'] = str(folder.join('autosubmitrc')) create_database(str(folder.join('autosubmitrc'))) assert "tests.db" in [Path(f).name for f in folder.listdir()] - init_expid(str(folder.join('autosubmitrc')), platform='pytest-local',create=False) + init_expid(str(folder.join('autosubmitrc')), platform='pytest-local', create=False, test_type='test') assert "t000" in [Path(f).name for f in folder.listdir()] return folder @@ -124,56 +125,72 @@ PLATFORMS: def test_migrate_conf_good_config(self, migrate_prepare_test_conf): # Test OK - as_conf, original, platforms, migrate_remote_only = migrate_prepare_test_conf + as_conf, _, platforms, migrate_remote_only = migrate_prepare_test_conf migrate_remote_only.check_migrate_config(as_conf, platforms, as_conf.misc_data["PLATFORMS"]) as_conf.misc_data["PLATFORMS"]["PYTEST-LOCAL"]["TEMP_DIR"] = "" migrate_remote_only.check_migrate_config(as_conf, platforms, as_conf.misc_data["PLATFORMS"]) def test_migrate_no_platforms(self, migrate_prepare_test_conf): - as_conf, original, platforms, migrate_remote_only = migrate_prepare_test_conf + as_conf, _, platforms, migrate_remote_only = migrate_prepare_test_conf as_conf.misc_data["PLATFORMS"] = {} with pytest.raises(AutosubmitCritical): migrate_remote_only.check_migrate_config(as_conf, platforms, as_conf.misc_data["PLATFORMS"]) def test_migrate_no_scratch_dir(self, migrate_prepare_test_conf): - as_conf, original, platforms, migrate_remote_only = migrate_prepare_test_conf + as_conf, _, platforms, migrate_remote_only = migrate_prepare_test_conf as_conf.misc_data["PLATFORMS"]["PYTEST-LOCAL"]["SCRATCH_DIR"] = "" with pytest.raises(AutosubmitCritical): migrate_remote_only.check_migrate_config(as_conf, platforms, as_conf.misc_data["PLATFORMS"]) def test_migrate_no_project(self, migrate_prepare_test_conf): - as_conf, original, platforms, migrate_remote_only = migrate_prepare_test_conf + as_conf, _, platforms, migrate_remote_only = migrate_prepare_test_conf as_conf.misc_data["PLATFORMS"]["PYTEST-LOCAL"]["PROJECT"] = "" with pytest.raises(AutosubmitCritical): migrate_remote_only.check_migrate_config(as_conf, platforms, as_conf.misc_data["PLATFORMS"]) def test_migrate_no_same_user(self, migrate_prepare_test_conf): - as_conf, original, platforms, migrate_remote_only = migrate_prepare_test_conf + as_conf, _, platforms, migrate_remote_only = migrate_prepare_test_conf as_conf.misc_data["PLATFORMS"]["PYTEST-LOCAL"]["SAME_USER"] = False with pytest.raises(AutosubmitCritical): migrate_remote_only.check_migrate_config(as_conf, platforms, as_conf.misc_data["PLATFORMS"]) def test_migrate_no_user(self, migrate_prepare_test_conf): - as_conf, original, platforms, migrate_remote_only = migrate_prepare_test_conf + as_conf, _, platforms, migrate_remote_only = migrate_prepare_test_conf as_conf.misc_data["PLATFORMS"]["PYTEST-LOCAL"]["USER"] = "" with pytest.raises(AutosubmitCritical): migrate_remote_only.check_migrate_config(as_conf, platforms, as_conf.misc_data["PLATFORMS"]) def test_migrate_no_host(self, migrate_prepare_test_conf): - as_conf, original, platforms, migrate_remote_only = migrate_prepare_test_conf + as_conf, _, platforms, migrate_remote_only = migrate_prepare_test_conf as_conf.misc_data["PLATFORMS"]["PYTEST-LOCAL"]["HOST"] = "" with pytest.raises(AutosubmitCritical): migrate_remote_only.check_migrate_config(as_conf, platforms, as_conf.misc_data["PLATFORMS"]) + # TODO: parametrize the test with the one below, but right now it's not working due not being well isolated ( to do after the transition to github) def test_migrate_remote(self, migrate_remote_only, migrate_tmpdir): # Expected behavior: migrate everything from scratch/whatever to scratch/whatever_new assert migrate_tmpdir.join(f'scratch/whatever/{migrate_tmpdir.owner}/t000').check(dir=True) assert migrate_tmpdir.join(f'scratch/whatever_new/{migrate_tmpdir.owner}/t000').check(dir=False) assert "dummy data" == migrate_tmpdir.join( f'scratch/whatever/{migrate_tmpdir.owner}/t000/real_data/dummy_symlink').read() + migrate_remote_only.migrate_offer_remote() + assert migrate_tmpdir.join('migrate_tmp_dir/t000').check(dir=True) + migrate_remote_only.migrate_pickup() + assert migrate_tmpdir.join(f'scratch/whatever/{migrate_tmpdir.owner}/t000').check(dir=False) + assert migrate_tmpdir.join(f'scratch/whatever_new/{migrate_tmpdir.owner}/t000').check(dir=True) + assert "dummy data" == migrate_tmpdir.join( + f'scratch/whatever_new/{migrate_tmpdir.owner}/t000/real_data/dummy_symlink').read() + def test_migrate_remote_rsync(self, migrate_remote_only, migrate_tmpdir, mocker): + # Expected behavior: migrate everything from scratch/whatever to scratch/whatever_new + assert migrate_tmpdir.join(f'scratch/whatever/{migrate_tmpdir.owner}/t000').check(dir=True) + assert migrate_tmpdir.join(f'scratch/whatever_new/{migrate_tmpdir.owner}/t000').check(dir=False) + assert "dummy data" == migrate_tmpdir.join( + f'scratch/whatever/{migrate_tmpdir.owner}/t000/real_data/dummy_symlink').read() + mocker.patch('autosubmit.platforms.paramiko_platform.ParamikoPlatform.move_file', return_value=False) migrate_remote_only.migrate_offer_remote() - assert migrate_tmpdir.join(f'migrate_tmp_dir/t000').check(dir=True) + assert migrate_tmpdir.join('migrate_tmp_dir/t000').check(dir=True) + assert not migrate_tmpdir.join(f'scratch/whatever/{migrate_tmpdir.owner}/t000').check(dir=True) migrate_remote_only.migrate_pickup() assert migrate_tmpdir.join(f'scratch/whatever/{migrate_tmpdir.owner}/t000').check(dir=False) assert migrate_tmpdir.join(f'scratch/whatever_new/{migrate_tmpdir.owner}/t000').check(dir=True)