diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index d946626320ed51f9c6c7fe48282040218dac1f75..0d76c7d187679697c1ed128c268b286f4a105708 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -2687,7 +2687,7 @@ class Autosubmit: current_table_structure = get_structure(expid, BasicConfig.STRUCTURES_DIR) subjobs = [] for job in job_list.get_job_list(): - job_info = JobList.retrieve_times(job.status, job.name, job._tmp_path, make_exception=False, + job_info = JobList.retrieve_times(job.status, job.name, job._tmp_path, make_exception=True, job_times=None, seconds=True, job_data_collection=None) time_total = (job_info.queue_time + job_info.run_time) if job_info else 0 subjobs.append( diff --git a/autosubmit/experiment/statistics.py b/autosubmit/experiment/statistics.py index 793210923d0e28c3325cc1ad098e2dc73621d51c..0188f00811c9f5dc0ec0af9520fe3b6cefefb25a 100644 --- a/autosubmit/experiment/statistics.py +++ b/autosubmit/experiment/statistics.py @@ -17,6 +17,7 @@ # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . +import math import datetime from autosubmit.job.job import Job from autosubmit.monitor.utils import FixedSizeList @@ -107,6 +108,31 @@ class ExperimentStats(object): def fail_run(self): return FixedSizeList(self._fail_run, 0.0) + def _estimate_requested_nodes(self,nodes,processors,tasks,processors_per_node) -> int: + if str(nodes).isdigit(): + return int(nodes) + elif str(tasks).isdigit(): + return math.ceil(int(processors) / int(tasks)) + elif str(processors_per_node).isdigit() and int(processors) > int(processors_per_node): + return math.ceil(int(processors) / int(processors_per_node)) + else: + return 1 + + def _calculate_processing_elements(self,nodes,processors,tasks,processors_per_node,exclusive) -> int: + if str(processors_per_node).isdigit(): + if str(nodes).isdigit(): + return int(nodes) * int(processors_per_node) + else: + estimated_nodes = self._estimate_requested_nodes(nodes,processors,tasks,processors_per_node) + if not exclusive and estimated_nodes <= 1 and int(processors) <= int(processors_per_node): + return int(processors) + else: + return estimated_nodes * int(processors_per_node) + elif (str(tasks).isdigit() or str(nodes).isdigit()): + Log.warning(f'Missing PROCESSORS_PER_NODE. Should be set if TASKS or NODES are defined. The PROCESSORS will used instead.') + return int(processors) + + def _calculate_stats(self): """ Main calculation @@ -116,6 +142,10 @@ class ExperimentStats(object): for i, job in enumerate(self._jobs_list): last_retrials = job.get_last_retrials() processors = job.total_processors + nodes = job.nodes + tasks = job.tasks + processors_per_node = job.processors_per_node + processors = self._calculate_processing_elements(nodes, processors, tasks, processors_per_node, job.exclusive) for retrial in last_retrials: if Job.is_a_completed_retrial(retrial): # The retrial has all necessary values and is status COMPLETED @@ -158,8 +188,7 @@ class ExperimentStats(object): self._total_jobs_run += len(last_retrials) self._total_jobs_failed += self.failed_jobs[i] self._threshold = max(self._threshold, job.total_wallclock) - self._expected_cpu_consumption += job.total_wallclock * \ - int(processors) + self._expected_cpu_consumption += job.total_wallclock * int(processors) self._expected_real_consumption += job.total_wallclock self._total_queueing_time += self._queued[i] diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index ce5b8fbe50615e3bfafa05985b886d99f6600545..c16924e1f5c0ac8a2e971e0b60dcb1f6fd5803a3 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -163,6 +163,7 @@ class Job(object): 'M': '%M%', 'M_': '%M_%', 'm': '%m%', 'm_': '%m_%'} self._threads = '1' self._processors = '1' + self._processors_per_node = None self._memory = '' self._memory_per_task = '' self._chunk = None @@ -776,6 +777,17 @@ class Job(object): def processors(self, value): self._processors = value + @property + @autosubmit_parameter(name=['processors_per_node']) + def processors_per_node(self): + """Number of processors per node that the job can use.""" + return self._processors_per_node + + @processors_per_node.setter + def processors_per_node(self, value): + """Number of processors per node that the job can use.""" + self._processors_per_node = value + def inc_fail_count(self): """ Increments fail count @@ -1600,6 +1612,7 @@ class Job(object): self.total_jobs = as_conf.jobs_data[self.section].get("TOTALJOBS", job_platform.total_jobs) self.max_waiting_jobs = as_conf.jobs_data[self.section].get("MAXWAITINGJOBS", job_platform.max_waiting_jobs) self.processors = as_conf.jobs_data[self.section].get("PROCESSORS",as_conf.platforms_data.get(job_platform.name,{}).get("PROCESSORS","1")) + self.processors_per_node = as_conf.jobs_data[self.section].get("PROCESSORS_PER_NODE",as_conf.platforms_data.get(job_platform.name,{}).get("PROCESSORS_PER_NODE","1")) self.nodes = as_conf.jobs_data[self.section].get("NODES",as_conf.platforms_data.get(job_platform.name,{}).get("NODES","")) self.exclusive = as_conf.jobs_data[self.section].get("EXCLUSIVE",as_conf.platforms_data.get(job_platform.name,{}).get("EXCLUSIVE",False)) self.threads = as_conf.jobs_data[self.section].get("THREADS",as_conf.platforms_data.get(job_platform.name,{}).get("THREADS","1")) diff --git a/autosubmit/job/job_dict.py b/autosubmit/job/job_dict.py index 9645f493f5f38e39fa7ec068536493020e1b44ee..ef2800988ed5ccafe5baff8df54ab046db6f692a 100644 --- a/autosubmit/job/job_dict.py +++ b/autosubmit/job/job_dict.py @@ -397,9 +397,13 @@ class DicJobs: job.partition = str(parameters[section].get( "PARTITION", "")) job.check = str(parameters[section].get( "CHECK", "true")).lower() job.export = str(parameters[section].get( "EXPORT", "")) - job.processors = str(parameters[section].get( "PROCESSORS", "")) - job.threads = str(parameters[section].get( "THREADS", "")) - job.tasks = str(parameters[section].get( "TASKS", "")) + # Used by Stat command # check in 4.1+ as this doesn't exist + job.processors = str(parameters[section].get("PROCESSORS",self.experiment_data.get("PLATFORMS",{}).get(job.platform_name,{}).get("PROCESSORS",""))) + job.processors_per_node = str(parameters[section].get("PROCESSORS_PER_NODE",self.experiment_data.get("PLATFORMS",{}).get(job.platform_name,{}).get("PROCESSORS_PER_NODE",""))) + job.nodes = str(parameters[section].get("NODES",self.experiment_data.get("PLATFORMS",{}).get(job.platform_name,{}).get("NODES",""))) + job.threads = str(parameters[section].get("THREADS",self.experiment_data.get("PLATFORMS",{}).get(job.platform_name,{}).get("THREADS",""))) + job.tasks = str(parameters[section].get("TASKS",self.experiment_data.get("PLATFORMS",{}).get(job.platform_name,{}).get("TASKS",""))) + job.exclusive = parameters[section].get("EXCLUSIVE",self.experiment_data.get("PLATFORMS",{}).get(job.platform_name,{}).get("EXCLUSIVE",False)) job.memory = str(parameters[section].get("MEMORY", "")) job.memory_per_task = str(parameters[section].get("MEMORY_PER_TASK", "")) remote_max_wallclock = self.experiment_data["PLATFORMS"].get(job.platform_name,{}) diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 3365bd8c9990a09e9af92d138d2d259df622fa26..d7648a2e96a520fde2d7e89fc310afa3284f2a97 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -2975,17 +2975,17 @@ class JobList(object): # Using standard procedure if status_code in [Status.RUNNING, Status.SUBMITTED, Status.QUEUING, - Status.FAILED] or make_exception is True: + Status.FAILED] or make_exception is True: # COMPLETED adds too much overhead so these values are now stored in a database and retrieved separately submit_time, start_time, finish_time, status = JobList._job_running_check( status_code, name, tmp_path) - if status_code in [Status.RUNNING, Status.FAILED]: + if status_code in [Status.RUNNING, Status.FAILED, Status.COMPLETED]: running_for_min = (finish_time - start_time) queuing_for_min = (start_time - submit_time) submit_time = mktime(submit_time.timetuple()) start_time = mktime(start_time.timetuple()) finish_time = mktime(finish_time.timetuple()) if status_code in [ - Status.FAILED] else 0 + Status.FAILED, Status.COMPLETED] else 0 else: queuing_for_min = ( datetime.datetime.now() - submit_time) diff --git a/autosubmit/statistics/jobs_stat.py b/autosubmit/statistics/jobs_stat.py index b2d1de97b02f51c41555046304d32743d4c106be..8eec5ec65aea75b5187779d0820dbb523b4b0048 100644 --- a/autosubmit/statistics/jobs_stat.py +++ b/autosubmit/statistics/jobs_stat.py @@ -1,27 +1,53 @@ #!/bin/env/python from datetime import datetime, timedelta from .utils import timedelta2hours +from log.log import Log +import math class JobStat(object): - def __init__(self, name, processors, wallclock, section, date, member, chunk): - # type: (str, int, float, str, str, str, str) -> None - self._name = name - self._processors = processors - self._wallclock = wallclock - self.submit_time = None # type: datetime - self.start_time = None # type: datetime - self.finish_time = None # type: datetime - self.completed_queue_time = timedelta() - self.completed_run_time = timedelta() - self.failed_queue_time = timedelta() - self.failed_run_time = timedelta() - self.retrial_count = 0 - self.completed_retrial_count = 0 - self.failed_retrial_count = 0 - self.section = section - self.date = date - self.member = member - self.chunk = chunk + def __init__(self, name, processors, wallclock, section, date, member, chunk, processors_per_node, tasks, nodes, exclusive ): + # type: (str, int, float, str, str, str, str, str, str , str, str) -> None + self._name = name + self._processors = self._calculate_processing_elements(nodes, processors, tasks, processors_per_node, exclusive) + self._wallclock = wallclock + self.submit_time = None # type: datetime + self.start_time = None # type: datetime + self.finish_time = None # type: datetime + self.completed_queue_time = timedelta() + self.completed_run_time = timedelta() + self.failed_queue_time = timedelta() + self.failed_run_time = timedelta() + self.retrial_count = 0 + self.completed_retrial_count = 0 + self.failed_retrial_count = 0 + self.section = section + self.date = date + self.member = member + self.chunk = chunk + + def _estimate_requested_nodes(self,nodes,processors,tasks,processors_per_node) -> int: + if str(nodes).isdigit(): + return int(nodes) + elif str(tasks).isdigit(): + return math.ceil(int(processors) / int(tasks)) + elif str(processors_per_node).isdigit() and int(processors) > int(processors_per_node): + return math.ceil(int(processors) / int(processors_per_node)) + else: + return 1 + + def _calculate_processing_elements(self,nodes,processors,tasks,processors_per_node,exclusive) -> int: + if str(processors_per_node).isdigit(): + if str(nodes).isdigit(): + return int(nodes) * int(processors_per_node) + else: + estimated_nodes = self._estimate_requested_nodes(nodes,processors,tasks,processors_per_node) + if not exclusive and estimated_nodes <= 1 and int(processors) <= int(processors_per_node): + return int(processors) + else: + return estimated_nodes * int(processors_per_node) + elif (str(tasks).isdigit() or str(nodes).isdigit()): + Log.warning(f'Missing PROCESSORS_PER_NODE. Should be set if TASKS or NODES are defined. The PROCESSORS will used instead.') + return int(processors) def inc_retrial_count(self): self.retrial_count += 1 @@ -51,7 +77,7 @@ class JobStat(object): @property def expected_cpu_consumption(self): return self._wallclock * self._processors - + @property def name(self): return self._name diff --git a/autosubmit/statistics/statistics.py b/autosubmit/statistics/statistics.py index 9f759065761fa54ad889cf6214b9cb3c03e89371..0f4037793bc3c4c6fea3d2fa95324951240ab6d2 100644 --- a/autosubmit/statistics/statistics.py +++ b/autosubmit/statistics/statistics.py @@ -47,9 +47,8 @@ class Statistics(object): for index, job in enumerate(self._jobs): retrials = job.get_last_retrials() for retrial in retrials: - print(retrial) job_stat = self._name_to_jobstat_dict.setdefault(job.name, JobStat(job.name, parse_number_processors( - job.processors), job.total_wallclock, job.section, job.date, job.member, job.chunk)) + job.processors), job.total_wallclock, job.section, job.date, job.member, job.chunk, job.processors_per_node, job.tasks, job.nodes, job.exclusive )) job_stat.inc_retrial_count() if Job.is_a_completed_retrial(retrial): job_stat.inc_completed_retrial_count()