diff --git a/VERSION b/VERSION index 7e32265b89f353d53583262ae7b29437ae400f97..6e0567827e382facfdb35d7a97c80f1cad723bce 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -3.15.18 \ No newline at end of file +3.15.19 \ No newline at end of file diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index e5f07216a9c915c28243f09b3288ba9401a09f82..958303eaa209317d9b7b68a126263433831ff62d 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -17,7 +17,7 @@ # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . from __future__ import print_function - +from helpers.utils import as_rsync import requests import threading import traceback @@ -2967,6 +2967,8 @@ class Autosubmit: return True + + @staticmethod def migrate(experiment_id, offer, pickup, only_remote): """ @@ -3073,29 +3075,36 @@ class Autosubmit: Log.result( "[OPTIONAL] HOST_TO directive not found. The directive HOST will remain unchanged") p = submitter.platforms[platform] - if p.temp_dir not in already_moved: + if p.root_dir not in already_moved: if p.root_dir != p.temp_dir and len(p.temp_dir) > 0: - already_moved.add(p.temp_dir) + already_moved.add(p.root_dir) # find /home/bsc32/bsc32070/dummy3 -type l -lname '/*' -printf ' ln -sf "$(realpath -s --relative-to="%p" $(readlink "%p")")" \n' > script.sh # command = "find " + p.root_dir + " -type l -lname \'/*\' -printf 'var=\"$(realpath -s --relative-to=\"%p\" \"$(readlink \"%p\")\")\" && var=${var:3} && ln -sf $var \"%p\" \\n'" Log.info( "Converting the absolute symlinks into relatives on platform {0} ", platform) - command = "find " + p.root_dir + \ + link_finder = "find " + p.root_dir + \ " -type l -lname \'/*\' -printf 'var=\"$(realpath -s --relative-to=\"%p\" \"$(readlink \"%p\")\")\" && var=${var:3} && ln -sf $var \"%p\" \\n' " try: - p.send_command(command, True) - if p.get_ssh_output().startswith("var="): + p.send_command(link_finder, True) + retrials = 2 + while p.get_ssh_output().startswith("var=") and retrials > 0: convertLinkPath = os.path.join( BasicConfig.LOCAL_ROOT_DIR, experiment_id, BasicConfig.LOCAL_TMP_DIR, 'convertLink.sh') with open(convertLinkPath, 'w') as convertLinkFile: convertLinkFile.write(p.get_ssh_output()) + try: + Log.debug("Link\n{0}", p.get_ssh_output()) + except: + pass p.send_file("convertLink.sh") convertLinkPathRemote = os.path.join( p.remote_log_dir, "convertLink.sh") command = "chmod +x " + convertLinkPathRemote + " && " + \ convertLinkPathRemote + " && rm " + convertLinkPathRemote p.send_command(command, True) + p.send_command(link_finder, True) + retrials = retrials - 1 else: Log.result("No links found in {0} for [{1}] ".format( p.root_dir, platform)) @@ -3112,9 +3121,15 @@ class Autosubmit: Log.info( "Moving remote files/dirs on {0}", platform) p.send_command("chmod 777 -R " + p.root_dir) - if not p.move_file(p.root_dir, os.path.join(p.temp_dir, experiment_id), False): - Log.result("No data found in {0} for [{1}]\n".format( - p.root_dir, platform)) + if not p.move_file(p.root_dir, os.path.join(p.temp_dir,experiment_id), False, path_root=""): + if not as_rsync(p,p.root_dir, p.temp_dir): + Log.printlog("The files/dirs on {0} cannot be moved to {1}.".format(p.root_dir, + os.path.join( + p.temp_dir, + experiment_id), + 6012)) + Log.result("Data on {0} has been successfully moved".format(p.root_dir)) + except IOError as e: Log.printlog("The files/dirs on {0} cannot be moved to {1}.".format(p.root_dir, os.path.join(p.temp_dir, @@ -3235,55 +3250,9 @@ class Autosubmit: "Copying remote files/dirs on {0}", platform) Log.info("Copying from {0} to {1}", os.path.join( p.temp_dir, experiment_id), p.root_dir) - finished = False - limit = 150 - rsync_retries = 0 try: - # Avoid infinite loop unrealistic upper limit, only for rsync failure - while not finished and rsync_retries < limit: - finished = False - pipeline_broke = False - Log.info( - "Rsync launched {0} times. Can take up to 150 retrials or until all data is transfered".format( - rsync_retries + 1)) - try: - p.send_command( - "rsync --timeout=3600 --bwlimit=20000 -aq --remove-source-files " + os.path.join( - p.temp_dir, experiment_id) + " " + p.root_dir[:-5]) - except BaseException as e: - Log.debug("{0}".format(str(e))) - rsync_retries += 1 - try: - if p.get_ssh_output_err() == "": - finished = True - elif p.get_ssh_output_err().lower().find("no such file or directory") == -1: - finished = True - else: - finished = False - except: - finished = False - pipeline_broke = True - if not pipeline_broke: - if p.get_ssh_output_err().lower().find("no such file or directory") == -1: - finished = True - elif p.get_ssh_output_err().lower().find( - "warning: rsync") != -1 or p.get_ssh_output_err().lower().find( - "closed") != -1 or p.get_ssh_output_err().lower().find( - "broken pipe") != -1 or p.get_ssh_output_err().lower().find( - "directory has vanished") != -1: - rsync_retries += 1 - finished = False - elif p.get_ssh_output_err() == "": - finished = True - else: - error = True - finished = False - break - p.send_command( - "find {0} -depth -type d -empty -delete".format( - os.path.join(p.temp_dir, experiment_id))) - Log.result( - "Empty dirs on {0} have been successfully deleted".format(p.temp_dir)) + finished = as_rsync(p, os.path.join( + p.temp_dir, experiment_id), p.root_dir[:-5]) if finished: p.send_command("chmod 755 -R " + p.root_dir) Log.result( @@ -3296,8 +3265,6 @@ class Autosubmit: Log.printlog("The files/dirs on {0} cannot be copied to {1}.".format( os.path.join(p.temp_dir, experiment_id), p.root_dir), 6012) error = True - break - except IOError as e: raise AutosubmitError( "I/O Issues", 6016, e.message) diff --git a/autosubmit/helpers/utils.py b/autosubmit/helpers/utils.py index 0ce27ab8a783713eb302dd026a7161a5f27f49db..8b62bc9a830c08fd3f63d631ffd1d570d5c5b873 100644 --- a/autosubmit/helpers/utils.py +++ b/autosubmit/helpers/utils.py @@ -27,4 +27,53 @@ def check_experiment_ownership(expid, basic_config, raise_error=False, logger=No is_eadmin = False if not is_owner and raise_error: raise AutosubmitCritical("You don't own the experiment {0}.".format(expid), 7012) - return is_owner, is_eadmin, current_owner_name \ No newline at end of file + return is_owner, is_eadmin, current_owner_name + + +def as_rsync(p, src, dest): + finished = False + limit = 150 + rsync_retries = 0 + # Avoid infinite loop unrealistic upper limit, only for rsync failure + while not finished and rsync_retries < limit: + finished = False + pipeline_broke = False + Log.info( + "Rsync launched {0} times. Can take up to 150 retrials or until all data is transfered".format( + rsync_retries + 1)) + try: + p.send_command( + "rsync --timeout=3600 --bwlimit=20000 -aqz --remove-source-files " + src + " " + dest) + except BaseException as e: + Log.debug("{0}".format(str(e))) + rsync_retries += 1 + try: + if p.get_ssh_output_err() == "": + finished = True + elif p.get_ssh_output_err().lower().find("no such file or directory") == -1: + finished = True + else: + finished = False + except: + finished = False + pipeline_broke = True + if not pipeline_broke: + if p.get_ssh_output_err().lower().find("no such file or directory") == -1: + finished = True + elif p.get_ssh_output_err().lower().find( + "warning: rsync") != -1 or p.get_ssh_output_err().lower().find( + "closed") != -1 or p.get_ssh_output_err().lower().find( + "broken pipe") != -1 or p.get_ssh_output_err().lower().find( + "directory has vanished") != -1: + rsync_retries += 1 + finished = False + elif p.get_ssh_output_err() == "": + finished = True + else: + error = True + finished = False + break + p.send_command("find {0} -depth -type d -empty -delete".format(src)) + Log.result( + "Empty dirs on {0} have been successfully deleted".format(src)) + return finished \ No newline at end of file diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index b451e321c09b8e49e17693ddb31d7702ff9c790f..ab2e566d33de3fecab77e46d3f389d63b1e39a19 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -1112,7 +1112,6 @@ class Job(object): self.hyperthreading = str(as_conf.get_hyperthreading(self.section)).lower() if self.hyperthreading is 'none': self.hyperthreading = str(job_platform.hyperthreading).lower() - if job_platform.processors_per_node is not None and int(self.tasks) > int(job_platform.processors_per_node): self.tasks = job_platform.processors_per_node self.tasks = str(self.tasks) diff --git a/autosubmit/platforms/ecplatform.py b/autosubmit/platforms/ecplatform.py index 9515692e14da07eb5749d3ac19de25612bbeee65..5fa75687917ac1e2b2a5bdc71e1f8819966aaa6c 100644 --- a/autosubmit/platforms/ecplatform.py +++ b/autosubmit/platforms/ecplatform.py @@ -236,7 +236,7 @@ class EcPlatform(ParamikoPlatform): raise AutosubmitError('Could not send file {0} to {1}'.format(os.path.join(self.tmp_path, filename),os.path.join(self.get_files_path(), filename)),6005,e.message) return True - def move_file(self, src, dest, must_exist = False): + def move_file(self, src, dest, must_exist = False, path_root = None): command = "ecaccess-file-move {0}:{1} {0}:{2}".format(self.host,os.path.join(self.remote_log_dir,src) , os.path.join(self.remote_log_dir,dest)) try: retries = 0 diff --git a/autosubmit/platforms/locplatform.py b/autosubmit/platforms/locplatform.py index a51b8882a28d9e1e6dfcf47bb9e93267a365b5b2..d6b0432832ee5f7046f99012f7b9e023b1fbb68e 100644 --- a/autosubmit/platforms/locplatform.py +++ b/autosubmit/platforms/locplatform.py @@ -198,7 +198,7 @@ class LocalPlatform(ParamikoPlatform): Log.debug('Could not remove file {0}'.format(os.path.join(self.tmp_path, filename))) return False return True - def move_file(self, src, dest, must_exist=False): + def move_file(self, src, dest, must_exist=False, path_root=None): """ Moves a file on the platform (includes .err and .out) :param src: source name @@ -206,6 +206,7 @@ class LocalPlatform(ParamikoPlatform): :param dest: destination name :param must_exist: ignore if file exist or not :type dest: str + """ try: path_root = self.get_files_path() diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index dcad9cdebb2feee9f363b7553ff1f5e3e8f67a36..3bb4b8c49ef4480c4eeadb78b880c75058906a10 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -18,7 +18,7 @@ from paramiko.ssh_exception import (SSHException, BadAuthenticationType, ChannelException, ProxyCommandFailure) import Xlib.support.connect as xlib_connect from threading import Thread - +from autosubmit.helpers.utils import as_rsync class ParamikoPlatform(Platform): @@ -380,7 +380,8 @@ class ParamikoPlatform(Platform): raise AutosubmitCritical( "Wrong User or invalid .ssh/config. Or invalid user in platform.conf or public key not set ", 7051, e.message) - def move_file(self, src, dest, must_exist=False): + + def move_file(self, src, dest, must_exist=False, path_root=None): """ Moves a file on the platform (includes .err and .out) :param src: source name @@ -388,22 +389,27 @@ class ParamikoPlatform(Platform): :param dest: destination name :param must_exist: ignore if file exist or not :type dest: str + :param path_root: root path + :type path_root: str """ - try: + if path_root is None: path_root = self.get_files_path() + try: src = os.path.join(path_root, src) dest = os.path.join(path_root, dest) - self._ftpChannel.rename(src,dest) + self._ftpChannel.rename(src, dest) return True - except IOError as e: if str(e) in "Garbage": raise AutosubmitError('File {0} does not exists, something went wrong with the platform'.format(os.path.join(path_root,src)), 6004, e.message) + if e.message.lower() in "failure": + Log.warning("FTP Channel did not work due an inter-device operation... Rsync will be used") + return False if must_exist: raise AutosubmitError("File {0} does not exists".format( os.path.join(path_root,src)), 6004, e.message) else: - Log.debug("File {0} doesn't exists ".format(path_root)) + Log.debug("File {0} was already moved or couldn't be moved".format(src)) return False except Exception as e: if str(e) in "Garbage": diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index 5c61eb93d4ac77ab155235d6cdef62b88171a776..e8f9dd723a8556ee9f2c692d88d7e1c717e342bc 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -151,7 +151,7 @@ class Platform(object): """ raise NotImplementedError - def move_file(self, src, dest): + def move_file(self, src, dest, must_exists = False, path_root = None): """ Moves a file on the platform :param src: source name diff --git a/autosubmit/platforms/wrappers/wrapper_builder.py b/autosubmit/platforms/wrappers/wrapper_builder.py index 3e8c67bf2e0d02e682e1c5761bf90de4811ca2a3..98db510a39941b663a6143fe01380c2a47981150 100644 --- a/autosubmit/platforms/wrappers/wrapper_builder.py +++ b/autosubmit/platforms/wrappers/wrapper_builder.py @@ -244,8 +244,10 @@ processors_per_node = int(jobs_resources['PROCESSORS_PER_NODE']) if node: machines += node +"_NEWLINE_" cores -= 1 + else: + break # Break the loop if cores is still greater than 0 - if cores > 0: + else: break for rest in range(processors_per_node-tasks): if len(all_cores) > 0: @@ -760,8 +762,10 @@ processors_per_node = int(jobs_resources['PROCESSORS_PER_NODE']) if node: machines += node +"_NEWLINE_" cores -= 1 + else: + break # Break the loop if cores is still greater than 0 - if cores > 0: + else: break for rest in range(processors_per_node-tasks): if len(all_cores) > 0: diff --git a/test/unit/test_autosubmit_config.py b/test/unit/test_autosubmit_config.py index 00e62440627e409325a5faff649333577f4311b1..dc2beb171f64e76dc06ff08adbf9ea55abde3cf6 100644 --- a/test/unit/test_autosubmit_config.py +++ b/test/unit/test_autosubmit_config.py @@ -119,7 +119,7 @@ class TestAutosubmitConfig(TestCase): def test_get_tasks(self): # arrange expected_value = '99999' - default_value = '0' + default_value = '1' config, parser_mock = self._arrange_config(expected_value) # act returned_value = config.get_tasks(self.section) diff --git a/test/unit/test_job.py b/test/unit/test_job.py index 59e1f51fc89670e28e0c2cfe12538fad4a5c4d8e..93a6076a6e8647d91480c47d66109d260a2a7c2d 100644 --- a/test/unit/test_job.py +++ b/test/unit/test_job.py @@ -292,10 +292,10 @@ class TestJob(TestCase): self.job.parameters['PROJECT_TYPE'] = "none" dummy_serial_platform = Mock() dummy_serial_platform.name = 'serial' - dummy_platform = Mock() + dummy_platform = MagicMock() dummy_platform.serial_platform = dummy_serial_platform dummy_platform.custom_directives = '["whatever"]' - + dummy_platform.proccesors_per_node = MagicMock(return_value=None) self.job._platform = dummy_platform # Act diff --git a/test/unit/test_machinefiles_wrapper.py b/test/unit/test_machinefiles_wrapper.py index b3c91cebc4112de8f72f7f89b29d403011d1f432..0f1a1ac5db476acbbee20204c269c3089b071878 100644 --- a/test/unit/test_machinefiles_wrapper.py +++ b/test/unit/test_machinefiles_wrapper.py @@ -37,7 +37,7 @@ class TestMachinefiles(TestCase): machinefiles_dict[job] = machines """).format(nodes, cores_list, self.job_scripts, wrapper_builder._indent(machinefiles_code, 4)) - exec (script, result) + exec(script, result) machinefiles_dict = result["machinefiles_dict"] all_machines = list()