# Some basic installs in case some package is missing, add any packages you may miss in this section
#!pip install pandas
#!pip install numpy
#!pip install matplotlib
#!pip install seaborn
#!pip install scikit-learn
# Initial imports
import pandas as pd
import numpy as np
import re
import os
# Imports for Data Analysis part
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
Note: If an updated version of joined_dataframe.csv already exists, you can jump to section 3¶
1. Data Loading¶
We start by checking if a previous version of the dataframe exists, loading it if it does and creating an empty one if it doesn't.
if os.path.isfile("exp_dataframe.csv"):
experiments_dataframe = pd.read_csv("exp_dataframe.csv")
else:
experiments_dataframe = pd.DataFrame()
# Define the path to the different out_ww_T_dxx_yy_fszz_C.slurm files
file_path = "/home/emillan/Documents/dev/out_experiment_files_fairshare"
Now we define a way to read all the content coming out of the SLURM simulator as output files. The format of the output is the various labels or variables (which are words) followed by a = and their corresponding values (which can be any concatenation of characters and numbers).
# Define a regular expression pattern to match each key-value pair that comes in the output files
# (\w+) matches one or more word characters (letters, digits, or underscores)
# = matches the equal sign character
# ([^\s]+) matches one or more non-whitespace characters
pattern = re.compile(r"(\w+)=([^\s]+)")
We create/load an information dataframe with all vertical and horizontal workflow sizes (of our choice), both wrapped and unwrapped. Also with all the possible fair share values and with the different datetimes in which the workflow will be injected. We will then add a column to the dataframe indicating the experiment to which each row belongs. This will allow us to separate the experiments during analysis if we use a single dataframe for the entire corpus of experiments. Finally, we will add a column indicating whether the experiment has already been run, so that we can keep track and avoid repeating experiments.
# If the file already exists, we just read it
if os.path.isfile("experiments_info.csv"):
experiments_info = pd.read_csv("experiments_info.csv")
else:
experiments_info = pd.DataFrame(columns = ["file_name", "wf_size", "wf_type", "date", "fairshare", "wrapped", "completed"])
workflow_sizes = [2, 8, 14, 20]
workflow_type = ["H", "V"]
dates = ["d14_10", "d14_15", "d14_20", "d15_10","d15_15", "d15_20"]
fairshare_values = [0.1, 0.5, 1.0]
wrap_condition = ["W", "U"]
for size in workflow_sizes:
for wf in workflow_type:
for date in dates:
for fairshare in fairshare_values:
for condition in wrap_condition:
fairshare_name = f"{fairshare}".replace(".", "") # This is to avoid trouble with the filename
out_file_name = "out"+"_"+str(size)+"_"+wf+"_"+date+"_fs"+fairshare_name+"_"+condition
experiments_info.loc[len(experiments_info)] = [out_file_name, size, wf, date, fairshare, condition == "W", False]
# We define a function to do an initial type conversion (most likely, some more changes will be needed during analyisis depending on the methods used)
def change_types(df : pd.DataFrame) -> pd.DataFrame:
# Place here all type conversions you want to perform
convert_dict = {
"JobId" : int,
"UserId" : int,
"GroupId" : int,
"SubmitTime" : int,
"StartTime" : int,
"EndTime" : int,
"NodeCnt" : int,
"ProcCnt" : int,
"Priority" : int
}
df = df.astype(convert_dict)
return df
# We read a .csv called experiment_status that keeps record of the completed documents:
experiments_status_path = "/home/emillan/Documents/dev/nextflow_code/experiments_status.csv"
experiments_status = pd.read_csv(experiments_status_path)
We update both exp_dataframe
and experiments_info
and save them.
for experiment in experiments_info["file_name"]:
# We avoid reloading already performed experiments into the dataframe
if not experiments_info[experiments_info["file_name"] == experiment]["completed"].values[0]:
exp_path = file_path+"/"+experiment+".slurm"
# We check if the simulation has been done: the output file exists and experiment is marked as completed
if os.path.isfile(exp_path) and experiments_status[experiments_status["file_name"] == experiment]["completed"].values[0]:
# Initialize a list to hold the job dictionaries
jobs = []
# Open and read the file
with open(exp_path, "r") as file:
for line in file:
# Find all key-value pairs in the line
# This returns a list of tuples, where each tuple is (key, value)
matches = pattern.findall(line)
# Convert matches to a dictionary and add it to the jobs list
# Dictionary comprehension to convert list of tuples to dictionary
job_dict = {key: value for key, value in matches}
jobs.append(job_dict)
# Create a DataFrame from the list of job dictionaries
df = pd.DataFrame(jobs)
# Specify the path where you want to save the whole workloads as CSVs
csvs_path = "full_exp_csvs"
df.to_csv(csvs_path+"/"+experiment+"_csv.csv", index=False)
# We performe some type conversions to be able to properly generate new variables
df = change_types(df)
# We save a copy of the dataframe to calculate in use and requested resources at the time of submission
temp = df.copy()
# After saving the whole workload, we select only the lines refering to he workflow added by us (user 723)
df = df[(df["UserId"]==723) & (df["StartTime"] > 100)]
# We calculate the used and requested resources at the time of submission
submit_time = df["SubmitTime"].min()
in_use = temp[(temp["StartTime"] < submit_time) & (temp["EndTime"] > submit_time)]["ProcCnt"].sum()
requested = temp[(temp["SubmitTime"] < submit_time) & (temp["StartTime"] > submit_time)]["ProcCnt"].sum()
# We add some columns that will give extra context
df["wait_time"] = df["StartTime"] - df["SubmitTime"]
df["execution_time"] = df["EndTime"] - df["StartTime"]
df["time"] = df["wait_time"] + df["execution_time"]
df["cpus_in_use"] = in_use
df["cpus_requested"] = requested
# We add a column that will act as an identifier and will allow us to later do a join (for analysis) with the "experiments_info" dataframe
df["exp"] = experiment
# We append the rows of interest (those of user 723) to the dataframe that contains all experiments performed
experiments_dataframe = pd.concat([experiments_dataframe, df], ignore_index=True)
#Since we have completed this experiment, we update its "completed" variable to True in the experiments info dataframe
experiments_info.loc[experiments_info["file_name"] == experiment, "completed"] = True
# Save the new version of both the experiments dataframe and the experiments info as CSV files
experiments_dataframe.to_csv("exp_dataframe.csv", index=False)
experiments_info.to_csv("experiments_info.csv", index=False)
2. Data Cleaning and Pre-processing¶
We read the previously updated exp_dataframe.csv
.
experiments_dataframe = pd.read_csv("exp_dataframe.csv")
experiments_dataframe.head()
JobId | UserId | GroupId | Name | JobState | Partition | TimeLimit | SubmitTime | StartTime | EndTime | ... | ProcCnt | WorkDir | Backfilled | Priority | wait_time | execution_time | time | cpus_in_use | cpus_requested | exp | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 8625 | 723 | 723 | sim_job | COMPLETED | normal | 1-16:00:00 | 1774539 | 1774591 | 1776391 | ... | 192 | /tmp | yes | 38634 | 52 | 1800 | 1852 | 75280 | 109520 | out_2_H_d14_10_fs01_W |
1 | 8625 | 723 | 723 | sim_job | COMPLETED | normal | 1-16:00:00 | 1774539 | 1774713 | 1776513 | ... | 96 | /tmp | yes | 38636 | 174 | 1800 | 1974 | 92240 | 234432 | out_2_H_d14_10_fs01_U |
2 | 8626 | 723 | 723 | sim_job | COMPLETED | normal | 1-16:00:00 | 1774539 | 1774774 | 1776574 | ... | 96 | /tmp | yes | 38644 | 235 | 1800 | 2035 | 92240 | 234432 | out_2_H_d14_10_fs01_U |
3 | 8625 | 723 | 723 | sim_job | COMPLETED | normal | 1-16:00:00 | 1773616 | 1773616 | 1775416 | ... | 192 | /tmp | no | 59514 | 0 | 1800 | 1800 | 91760 | 233200 | out_2_H_d14_10_fs05_W |
4 | 8626 | 723 | 723 | sim_job | COMPLETED | normal | 1-16:00:00 | 1773616 | 1773616 | 1775416 | ... | 96 | /tmp | no | 60136 | 0 | 1800 | 1800 | 91760 | 233200 | out_2_H_d14_10_fs05_U |
5 rows × 22 columns
experiments_dataframe.dtypes
JobId int64 UserId int64 GroupId int64 Name object JobState object Partition object TimeLimit object SubmitTime int64 StartTime int64 EndTime int64 NodeList object NodeCnt int64 ProcCnt int64 WorkDir object Backfilled object Priority int64 wait_time int64 execution_time int64 time int64 cpus_in_use int64 cpus_requested int64 exp object dtype: object
We delete all rows with SubmitTime equal to 100, since these are artificial jobs added to give the user a certain fair share value.
experiments_dataframe = experiments_dataframe[experiments_dataframe["SubmitTime"] != 100]
experiments_dataframe.head()
JobId | UserId | GroupId | Name | JobState | Partition | TimeLimit | SubmitTime | StartTime | EndTime | ... | ProcCnt | WorkDir | Backfilled | Priority | wait_time | execution_time | time | cpus_in_use | cpus_requested | exp | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 8625 | 723 | 723 | sim_job | COMPLETED | normal | 1-16:00:00 | 1774539 | 1774591 | 1776391 | ... | 192 | /tmp | yes | 38634 | 52 | 1800 | 1852 | 75280 | 109520 | out_2_H_d14_10_fs01_W |
1 | 8625 | 723 | 723 | sim_job | COMPLETED | normal | 1-16:00:00 | 1774539 | 1774713 | 1776513 | ... | 96 | /tmp | yes | 38636 | 174 | 1800 | 1974 | 92240 | 234432 | out_2_H_d14_10_fs01_U |
2 | 8626 | 723 | 723 | sim_job | COMPLETED | normal | 1-16:00:00 | 1774539 | 1774774 | 1776574 | ... | 96 | /tmp | yes | 38644 | 235 | 1800 | 2035 | 92240 | 234432 | out_2_H_d14_10_fs01_U |
3 | 8625 | 723 | 723 | sim_job | COMPLETED | normal | 1-16:00:00 | 1773616 | 1773616 | 1775416 | ... | 192 | /tmp | no | 59514 | 0 | 1800 | 1800 | 91760 | 233200 | out_2_H_d14_10_fs05_W |
4 | 8626 | 723 | 723 | sim_job | COMPLETED | normal | 1-16:00:00 | 1773616 | 1773616 | 1775416 | ... | 96 | /tmp | no | 60136 | 0 | 1800 | 1800 | 91760 | 233200 | out_2_H_d14_10_fs05_U |
5 rows × 22 columns
We'll drop columns that are not used for the analysis. Most of them are identifiers that provide no information, others are not useful for our analysis, and others can be inferred from other variables (as an exception to this last case, we will keep the variable ProcCnt and some time variables that could be useful).
experiments_dataframe.drop(columns=["JobId","UserId", "GroupId", "Name", "JobState", "Partition", "TimeLimit", "NodeList", "WorkDir", "NodeCnt"])
SubmitTime | StartTime | EndTime | ProcCnt | Backfilled | Priority | wait_time | execution_time | time | cpus_in_use | cpus_requested | exp | |
---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 1774539 | 1774591 | 1776391 | 192 | yes | 38634 | 52 | 1800 | 1852 | 75280 | 109520 | out_2_H_d14_10_fs01_W |
1 | 1774539 | 1774713 | 1776513 | 96 | yes | 38636 | 174 | 1800 | 1974 | 92240 | 234432 | out_2_H_d14_10_fs01_U |
2 | 1774539 | 1774774 | 1776574 | 96 | yes | 38644 | 235 | 1800 | 2035 | 92240 | 234432 | out_2_H_d14_10_fs01_U |
3 | 1773616 | 1773616 | 1775416 | 192 | no | 59514 | 0 | 1800 | 1800 | 91760 | 233200 | out_2_H_d14_10_fs05_W |
4 | 1773616 | 1773616 | 1775416 | 96 | no | 60136 | 0 | 1800 | 1800 | 91760 | 233200 | out_2_H_d14_10_fs05_U |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
1723 | 1922823 | 1923980 | 1925780 | 96 | yes | 72795 | 1157 | 1800 | 2957 | 92976 | 202416 | out_20_V_d15_20_fs10_U |
1724 | 1924623 | 1925810 | 1927610 | 96 | yes | 72795 | 1187 | 1800 | 2987 | 92976 | 202416 | out_20_V_d15_20_fs10_U |
1725 | 1926423 | 1927640 | 1929440 | 96 | yes | 72795 | 1217 | 1800 | 3017 | 92976 | 202416 | out_20_V_d15_20_fs10_U |
1726 | 1928223 | 1929470 | 1931270 | 96 | yes | 72162 | 1247 | 1800 | 3047 | 92976 | 202416 | out_20_V_d15_20_fs10_U |
1727 | 1930023 | 1931300 | 1933100 | 96 | yes | 72162 | 1277 | 1800 | 3077 | 92976 | 202416 | out_20_V_d15_20_fs10_U |
1728 rows × 12 columns
Before aggregating non-wrapped jobs, we do a quick analysis of the evolution of priority. This is to determine which aggregation function works best for priority (other variables have clear aggregation methods due to their nature, as will be shown later), since it is a tricky variable to aggregate.
# Get the experiments where the last character of the file name is "U" (unwrapped)
unwrapped = experiments_dataframe[experiments_dataframe["exp"].str[-1] == "U"].copy()
# Reset the index to use it for sorting
unwrapped = unwrapped.reset_index()
# Sort by SubmitTime and the new index to ensure ranking is done correctly
unwrapped = unwrapped.sort_values(by=["SubmitTime", "index"])
# Create a variable called "order", which for each experiment, will represent the order in which the jobs were submitted (1, 2, 3, ...)
# using method="first" to assign ranks uniquely even in case of ties
unwrapped["order"] = unwrapped.groupby("exp")["SubmitTime"].rank(method="first", ascending=True)
# Change order type to integer
unwrapped["order"] = unwrapped["order"].astype(int)
unwrapped.head()
index | JobId | UserId | GroupId | Name | JobState | Partition | TimeLimit | SubmitTime | StartTime | ... | WorkDir | Backfilled | Priority | wait_time | execution_time | time | cpus_in_use | cpus_requested | exp | order | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
4 | 7 | 8624 | 723 | 723 | sim_job | COMPLETED | normal | 1-16:00:00 | 1773423 | 1773423 | ... | /tmp | no | 100010 | 0 | 1800 | 1800 | 93168 | 234160 | out_2_H_d14_10_fs10_U | 1 |
5 | 8 | 8625 | 723 | 723 | sim_job | COMPLETED | normal | 1-16:00:00 | 1773423 | 1773554 | ... | /tmp | yes | 89897 | 131 | 1800 | 1931 | 93168 | 234160 | out_2_H_d14_10_fs10_U | 2 |
40 | 61 | 8624 | 723 | 723 | sim_job | COMPLETED | normal | 1-16:00:00 | 1773423 | 1773423 | ... | /tmp | no | 100010 | 0 | 1800 | 1800 | 91008 | 219728 | out_2_V_d14_10_fs10_U | 1 |
88 | 127 | 8624 | 723 | 723 | sim_job | COMPLETED | normal | 1-16:00:00 | 1773423 | 1773423 | ... | /tmp | no | 100010 | 0 | 1800 | 1800 | 60400 | 307376 | out_8_H_d14_10_fs10_U | 1 |
89 | 128 | 8625 | 723 | 723 | sim_job | COMPLETED | normal | 1-16:00:00 | 1773423 | 1773428 | ... | /tmp | no | 100010 | 5 | 1800 | 1805 | 60400 | 307376 | out_8_H_d14_10_fs10_U | 2 |
5 rows × 24 columns
# Extract the unique workflow sizes from the "exp" column
unwrapped["workflow_size"] = unwrapped["exp"].str.extract(r"out_(\d+)_")[0]
unique_sizes = unwrapped["workflow_size"].unique()
# Create a plot for each workflow size
for size in unique_sizes:
plt.figure(figsize=(12, 8))
# Filter the DataFrame for the current workflow size
subset_unwrapped = unwrapped[unwrapped["workflow_size"] == size]
# Plot the line plot
sns.lineplot(
data=subset_unwrapped,
x="order",
y="Priority",
hue="exp",
style="exp",
markers=True,
dashes=False
)
# Set plot labels and title
plt.xlabel("Order")
plt.ylabel("Priority")
plt.title(f"Priority vs. Order for Workflow Size {size}")
plt.legend(title="Workflow Size", bbox_to_anchor=(1.05, 1), loc="upper left")
# Set x-ticks to include all unique "order" values
plt.xticks(ticks=subset_unwrapped["order"].unique())
# Show plot
plt.tight_layout()
plt.show()
Considering that in the worst case the priority can change by more than 30000 units from the first to the last job of the workflow, we will use the minimum priority of the entire workflow as the aggregation function. The rationale behind this is that the job with the lowest priority is the one that will extend the execution. It is important to remember that we do this aggregation to facilitate the comparison between an aggregated experiment (a single job) and the same experiment without wrapper (a vector of jobs).
def backfill_percentage(experiment):
return (experiment == "yes").mean() * 100 # Since backfill is a binary variable, we can calculate the percentage of backfilled jobs of the given workflow
def custom_aggregation(experiment):
result = {
"SubmitTime": experiment["SubmitTime"].min(),
"StartTime": experiment["StartTime"].min(),
"EndTime": experiment["EndTime"].max(),
"Priority": experiment["Priority"].min(),
"Backfilled": backfill_percentage(experiment["Backfilled"]),
"cpus_in_use": experiment["cpus_in_use"].mean(),
"cpus_requested": experiment["cpus_requested"].mean(),
}
# Conditional aggregation for wait_time, execution_time and time for Horizontal or Vertical experiments
if "H" in experiment["exp"].iloc[0]:
result["wait_time"] = experiment["wait_time"].mean()
result["execution_time"] = experiment["execution_time"].mean()
result["time"] = experiment["time"].mean()
result["ProcCnt"] = experiment["ProcCnt"].sum()
else: # Vertical experiment
result["wait_time"] = experiment["wait_time"].sum()
result["execution_time"] = experiment["execution_time"].sum()
result["time"] = experiment["time"].sum()
result["ProcCnt"] = experiment["ProcCnt"].mean()
return pd.Series(result)
# Apply the custom aggregation function
aggregated_exp_dataframe = experiments_dataframe.groupby("exp").apply(custom_aggregation).reset_index().rename(columns={"Backfilled": "backfill_pct"})
aggregated_exp_dataframe.head()
exp | SubmitTime | StartTime | EndTime | Priority | backfill_pct | cpus_in_use | cpus_requested | wait_time | execution_time | time | ProcCnt | |
---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | out_14_H_d14_10_fs01_U | 1774539.0 | 1774591.0 | 1776391.0 | 38622.0 | 100.000000 | 90736.0 | 238304.0 | 52.000000 | 1800.0 | 1852.000000 | 1344.0 |
1 | out_14_H_d14_10_fs01_W | 1774539.0 | 1775689.0 | 1777489.0 | 38883.0 | 100.000000 | 92656.0 | 234448.0 | 1150.000000 | 1800.0 | 2950.000000 | 1344.0 |
2 | out_14_H_d14_10_fs05_U | 1773616.0 | 1773616.0 | 1775491.0 | 60136.0 | 0.000000 | 92464.0 | 234144.0 | 32.142857 | 1800.0 | 1832.142857 | 1344.0 |
3 | out_14_H_d14_10_fs05_W | 1773616.0 | 1773616.0 | 1775416.0 | 60270.0 | 0.000000 | 91760.0 | 233200.0 | 0.000000 | 1800.0 | 1800.000000 | 1344.0 |
4 | out_14_H_d14_10_fs10_U | 1773423.0 | 1773423.0 | 1775354.0 | 89898.0 | 92.857143 | 93168.0 | 230064.0 | 121.642857 | 1800.0 | 1921.642857 | 1344.0 |
Now that we have successfully aggregated the unwrapped workflows into single instances of our dataframe, we compute a join with the experiments_info dataframe to get general information about the experiments that will help us analyze the results.
joined_dataframe = aggregated_exp_dataframe.join(experiments_info.set_index("file_name"), on="exp").drop(columns=["completed"])
joined_dataframe.head()
exp | SubmitTime | StartTime | EndTime | Priority | backfill_pct | cpus_in_use | cpus_requested | wait_time | execution_time | time | ProcCnt | wf_size | wf_type | date | fairshare | wrapped | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | out_14_H_d14_10_fs01_U | 1774539.0 | 1774591.0 | 1776391.0 | 38622.0 | 100.000000 | 90736.0 | 238304.0 | 52.000000 | 1800.0 | 1852.000000 | 1344.0 | 14 | H | d14_10 | 0.1 | False |
1 | out_14_H_d14_10_fs01_W | 1774539.0 | 1775689.0 | 1777489.0 | 38883.0 | 100.000000 | 92656.0 | 234448.0 | 1150.000000 | 1800.0 | 2950.000000 | 1344.0 | 14 | H | d14_10 | 0.1 | True |
2 | out_14_H_d14_10_fs05_U | 1773616.0 | 1773616.0 | 1775491.0 | 60136.0 | 0.000000 | 92464.0 | 234144.0 | 32.142857 | 1800.0 | 1832.142857 | 1344.0 | 14 | H | d14_10 | 0.5 | False |
3 | out_14_H_d14_10_fs05_W | 1773616.0 | 1773616.0 | 1775416.0 | 60270.0 | 0.000000 | 91760.0 | 233200.0 | 0.000000 | 1800.0 | 1800.000000 | 1344.0 | 14 | H | d14_10 | 0.5 | True |
4 | out_14_H_d14_10_fs10_U | 1773423.0 | 1773423.0 | 1775354.0 | 89898.0 | 92.857143 | 93168.0 | 230064.0 | 121.642857 | 1800.0 | 1921.642857 | 1344.0 | 14 | H | d14_10 | 1.0 | False |
We now add an additional variable called response_time, which is calculated as the end time of the last job of the workflow minus the submit time of the first job. We also create a normalized version of this variable to allow for more accurate comparisons. This normalized version is calculated by dividing the response time by the total runtime or execution time of the workflow. We also create a normalized version of ProcCnt with respect to the total number of CPUs on the machine, which is 93312. We do the same with cpus_in_us and cpus_requested. Finally, we create a variable called exp_general so that we can later compare experiments side by side with respect to the wrapping condition.
joined_dataframe["response_time"] = joined_dataframe["EndTime"] - joined_dataframe["SubmitTime"]
joined_dataframe["norm_response_time"] = joined_dataframe["response_time"]/joined_dataframe["execution_time"]
joined_dataframe["norm_proc_cnt"] = joined_dataframe["ProcCnt"]/93312
joined_dataframe["norm_cpus_in_use"] = joined_dataframe["cpus_in_use"]/93312
joined_dataframe["norm_cpus_requested"] = joined_dataframe["cpus_requested"]/93312
joined_dataframe["exp_general"] = joined_dataframe["exp"].str[:-1] # We remove the last character, which is the condition (wrapped or unwrapped), resulting in a general experiment identifier
print(joined_dataframe.shape)
joined_dataframe.head()
(288, 23)
exp | SubmitTime | StartTime | EndTime | Priority | backfill_pct | cpus_in_use | cpus_requested | wait_time | execution_time | ... | wf_type | date | fairshare | wrapped | response_time | norm_response_time | norm_proc_cnt | norm_cpus_in_use | norm_cpus_requested | exp_general | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | out_14_H_d14_10_fs01_U | 1774539.0 | 1774591.0 | 1776391.0 | 38622.0 | 100.000000 | 90736.0 | 238304.0 | 52.000000 | 1800.0 | ... | H | d14_10 | 0.1 | False | 1852.0 | 1.028889 | 0.014403 | 0.972394 | 2.553841 | out_14_H_d14_10_fs01_ |
1 | out_14_H_d14_10_fs01_W | 1774539.0 | 1775689.0 | 1777489.0 | 38883.0 | 100.000000 | 92656.0 | 234448.0 | 1150.000000 | 1800.0 | ... | H | d14_10 | 0.1 | True | 2950.0 | 1.638889 | 0.014403 | 0.992970 | 2.512517 | out_14_H_d14_10_fs01_ |
2 | out_14_H_d14_10_fs05_U | 1773616.0 | 1773616.0 | 1775491.0 | 60136.0 | 0.000000 | 92464.0 | 234144.0 | 32.142857 | 1800.0 | ... | H | d14_10 | 0.5 | False | 1875.0 | 1.041667 | 0.014403 | 0.990912 | 2.509259 | out_14_H_d14_10_fs05_ |
3 | out_14_H_d14_10_fs05_W | 1773616.0 | 1773616.0 | 1775416.0 | 60270.0 | 0.000000 | 91760.0 | 233200.0 | 0.000000 | 1800.0 | ... | H | d14_10 | 0.5 | True | 1800.0 | 1.000000 | 0.014403 | 0.983368 | 2.499143 | out_14_H_d14_10_fs05_ |
4 | out_14_H_d14_10_fs10_U | 1773423.0 | 1773423.0 | 1775354.0 | 89898.0 | 92.857143 | 93168.0 | 230064.0 | 121.642857 | 1800.0 | ... | H | d14_10 | 1.0 | False | 1931.0 | 1.072778 | 0.014403 | 0.998457 | 2.465535 | out_14_H_d14_10_fs10_ |
5 rows × 23 columns
We save the aggregated dataframe for analysis.
joined_dataframe.to_csv("joined_dataframe.csv", index=False)
3. Plot Based Analysis¶
First, we define a general colorblind-friendly palette for the plots. This palette was designed by Bang Wong, and it was chosen following the article Coloring for Colorblindness by David Nichols. In addition, the specific colors of each plot were selected from the entire palette after checking that they were still clearly distinguishable when converted to black and white. This is a widely used method for determining whether a color combination is colorblind friendly.
clr_blind_palette = ["#000000", "#E69F00", "#56B4E9", "#009E73", "#F0E442", "#0072B2", "#D55E00", "#CC79A7"]
joined_dataframe = pd.read_csv("joined_dataframe.csv")
joined_dataframe.head()
exp | SubmitTime | StartTime | EndTime | Priority | backfill_pct | cpus_in_use | cpus_requested | wait_time | execution_time | ... | wf_type | date | fairshare | wrapped | response_time | norm_response_time | norm_proc_cnt | norm_cpus_in_use | norm_cpus_requested | exp_general | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | out_14_H_d14_10_fs01_U | 1774539.0 | 1774591.0 | 1776391.0 | 38622.0 | 100.000000 | 90736.0 | 238304.0 | 52.000000 | 1800.0 | ... | H | d14_10 | 0.1 | False | 1852.0 | 1.028889 | 0.014403 | 0.972394 | 2.553841 | out_14_H_d14_10_fs01_ |
1 | out_14_H_d14_10_fs01_W | 1774539.0 | 1775689.0 | 1777489.0 | 38883.0 | 100.000000 | 92656.0 | 234448.0 | 1150.000000 | 1800.0 | ... | H | d14_10 | 0.1 | True | 2950.0 | 1.638889 | 0.014403 | 0.992970 | 2.512517 | out_14_H_d14_10_fs01_ |
2 | out_14_H_d14_10_fs05_U | 1773616.0 | 1773616.0 | 1775491.0 | 60136.0 | 0.000000 | 92464.0 | 234144.0 | 32.142857 | 1800.0 | ... | H | d14_10 | 0.5 | False | 1875.0 | 1.041667 | 0.014403 | 0.990912 | 2.509259 | out_14_H_d14_10_fs05_ |
3 | out_14_H_d14_10_fs05_W | 1773616.0 | 1773616.0 | 1775416.0 | 60270.0 | 0.000000 | 91760.0 | 233200.0 | 0.000000 | 1800.0 | ... | H | d14_10 | 0.5 | True | 1800.0 | 1.000000 | 0.014403 | 0.983368 | 2.499143 | out_14_H_d14_10_fs05_ |
4 | out_14_H_d14_10_fs10_U | 1773423.0 | 1773423.0 | 1775354.0 | 89898.0 | 92.857143 | 93168.0 | 230064.0 | 121.642857 | 1800.0 | ... | H | d14_10 | 1.0 | False | 1931.0 | 1.072778 | 0.014403 | 0.998457 | 2.465535 | out_14_H_d14_10_fs10_ |
5 rows × 23 columns
In most of the following analysis, we will distinguish between horizontal and vertical workflows because of their very different nature. Within this distinction, we will also make most comparisons by workflow size to make the results easier to understand and compare between sizes.
3.1 Response time analysis¶
We will start by analyzing the response time, which is the target variable for this project, since the main idea of wrappers is to reduce the total time a job spends in the machine, both waiting and being executed. We will later perform a very similar analysis, its time with the normalized response time (the response time divided by the execution time) to have a more general picture that will allow some other comparisons.
# Create a figure with two subplots side by side
fig, axes = plt.subplots(1, 2, figsize=(16, 6))
# Plot for Horizontal workflows
sns.barplot(x="wf_size", y="response_time", hue="wrapped", data=joined_dataframe[joined_dataframe["wf_type"] == "H"], ax=axes[0], palette=clr_blind_palette[5:7], errorbar=("pi", 50))
axes[0].set_title("Mean Response Time by Workflow Size for Horizontal Workflows")
axes[0].set_ylabel("Response Time")
axes[0].set_xlabel("Workflow Size")
# Plot for Vertical workflows
vertical_plot = sns.barplot(x="wf_size", y="response_time", hue="wrapped", data=joined_dataframe[joined_dataframe["wf_type"] == "V"], ax=axes[1], palette=clr_blind_palette[5:7], errorbar=("pi", 50))
axes[1].set_title("Mean Response Time by Workflow Size for Vertical Workflows")
axes[1].set_ylabel("")
axes[1].set_xlabel("Workflow Size")
# Annotate bars in the right plot with mean values
for container in vertical_plot.containers:
for bar in container:
height = bar.get_height()
vertical_plot.annotate(f"{height:.2f}",
xy=(bar.get_x() + bar.get_width() / 2, height),
xytext=(0, 5), # Adjust the second value for vertical offset
textcoords="offset points",
ha="center", va="bottom")
# Remove legends from individual plots
axes[0].legend_.remove()
axes[1].legend_.remove()
# Add gridlines to the y-axis
for ax in axes:
ax.yaxis.grid(True, linestyle="--", which="both", color="gray", alpha=0.7)
# Create a single legend for the entire figure at the specified position
handles, labels = axes[0].get_legend_handles_labels()
fig.legend(handles, labels, loc="center left", bbox_to_anchor=(1, 0.5), title="Wrapped")
# Adjust the layout to prevent overlap
plt.tight_layout(rect=[0, 0, 1, 1]) # Adjust rect to make space for legend
# Display the plots
plt.show()
The first two graphs are paired bar graphs showing the mean response time values of unwrapped and wrapped workflows for each workflow size, with one of these graphs for each workflow type: horizontal (executed in parallel) and vertical (executed sequentially due to dependencies). The bars also include an error bar representing the interquartile range (IQR).
At first glance, we can see a general trend: on average, for horizontal workflows, wrapped workflows have a longer response time, and for vertical workflows, wrapped workflows have a shorter response time. This fits with the discussion in Manuel's thesis, where he found that for his case study, vertical workflows usually benefited from using wrappers, while horizontal workflows typically did not improve and sometimes even worsened.
However, these plots do not allow for a good direct comparison between horizontal and vertical workflows, mainly because of the scale of response time for each type of workflow (for horizontal workflows, execution time is constant, whereas for vertical workflows, it increases linearly with size). The following graphs help with this problem by sharing the y-axis, making it easier to grasp the differences between the two types of workflows.
# Create a figure with 2 subplots for horizontal and vertical workflows
fig, axes = plt.subplots(1, 2, figsize=(16, 6), sharey=True)
# Horizontal workflows
sns.stripplot(
x="wf_size", y="response_time", hue="wrapped",
data=joined_dataframe[joined_dataframe["wf_type"] == "H"],
ax=axes[0], dodge=True, jitter=True, palette=clr_blind_palette[5:7]
)
axes[0].set_title("Response Time Distribution by Workflow Size for Horizontal Workflows")
axes[0].set_ylabel("Response Time")
axes[0].set_xlabel("Workflow Size")
# Vertical workflows
sns.stripplot(
x="wf_size", y="response_time", hue="wrapped",
data=joined_dataframe[joined_dataframe["wf_type"] == "V"],
ax=axes[1], dodge=True, jitter=True, palette=clr_blind_palette[5:7]
)
axes[1].set_title("Response Time Distribution by Workflow Size for Vertical Workflows")
axes[1].set_ylabel("")
axes[1].set_xlabel("Workflow Size")
# Remove legends from individual plots
for ax in axes.flatten():
ax.legend_.remove()
# Add gridlines to the y-axis
for ax in axes.flatten():
ax.yaxis.grid(True, linestyle="--", which="both", color="gray", alpha=0.7)
# Create a single legend for the entire figure at the specified position
handles, labels = axes[0].get_legend_handles_labels()
fig.legend(handles, labels, loc="center left", bbox_to_anchor=(1, 0.5), title="Wrapped")
# Adjust the layout to prevent overlap
plt.tight_layout(rect=[0, 0, 1, 1]) # Adjust rect to make space for legend
# Display the plots
plt.show()
These scatter plots show the response time distributions for unwrapped and wrapped time with the previous differentiations (size and workflow type).
Here we have a more detailed version of the information from the previous plot. We can observe that, in general, the horizontal workflows seem to have a higher variance than the vertical ones, since there are more extreme values. This seems to happen especially for the horizontal wrapped workflows, which is consistent with the previous studies (Manuel G. Marciani and Pablo Goitia), which showed that the result of wrapping a horizontal workflow is typically unclear.
# Calculate mean response times for wrapped True and False
mean_response_times = joined_dataframe.groupby(["wf_size", "fairshare", "wrapped", "wf_type"])["response_time"].mean().reset_index()
# Pivot the data to get mean response times for wrapped True and False side by side
pivot_means = mean_response_times.pivot_table(index=["wf_size", "fairshare", "wf_type"], columns="wrapped", values="response_time").reset_index()
# Calculate the difference between wrapped and unwrapped workflows
pivot_means["difference"] = pivot_means[True] - pivot_means[False]
# Create a figure with two subplots side by side
fig, axes = plt.subplots(1, 2, figsize=(16, 6), sharey=True)
# Add gridlines to the y-axis
for ax in axes:
ax.yaxis.grid(True, linestyle="--", which="both", color="gray", alpha=0.7)
# Plot for Horizontal workflows
sns.barplot(x="wf_size", y="difference", hue="fairshare", data=pivot_means[pivot_means["wf_type"] == "H"], ax=axes[0], palette=clr_blind_palette[5:8])
axes[0].set_title("Mean Difference of Wrapped and Unwrapped Response Time for Horizontal Workflows")
axes[0].set_ylabel("Difference of Mean Response Time (Wrapped - Unwrapped)")
axes[0].set_xlabel("Workflow Size")
# Plot for Vertical workflows
sns.barplot(x="wf_size", y="difference", hue="fairshare", data=pivot_means[pivot_means["wf_type"] == "V"], ax=axes[1], palette=clr_blind_palette[5:8])
axes[1].set_title("Mean Difference of Wrapped and Unwrapped Response Time for Vertical Workflows")
axes[1].set_ylabel("")
axes[1].set_xlabel("Workflow Size")
# Remove legends from individual plots
axes[0].legend_.remove()
axes[1].legend_.remove()
# Create a single legend for the entire figure at the specified position
handles, labels = axes[0].get_legend_handles_labels()
fig.legend(handles, labels, loc="center left", bbox_to_anchor=(1, 0.5), title="Fairshare")
# Adjust the layout to prevent overlap
plt.tight_layout(rect=[0, 0, 1, 1]) # Adjust rect to make space for legend
# Display the plots
plt.show()
Although the effect of fair share may seem predictable: higher fair share, better response time, regardless of the use of wrappers; this plot shows that (specifically for vertical workflows) if this is the case, the difference between wrapping and not wrapping is usually accentuated the lower the fair share is (with some exceptions, perhaps caused by the small amount of data), and even more so as the size of the workflow increases. For the horizontal workflows, we can't extract a clear trend due to the high variance of the results.
Note : we are computing the difference of the means, which in this case is equivalent to the mean of the differences.
Proof that the Difference of Means Equals the Mean of the Differences¶
Given two sets of paired data points, $$ x_1, x_2, \dots, x_n $$ and $$ y_1, y_2, \dots, y_n $$ We have that:
The mean of the differences is calculated as:
$$ \text{Mean of Differences} = \frac{1}{n} \sum_{i=1}^{n} (x_i - y_i) $$The difference of means is calculated as:
$$ \text{Difference of Means} = \left(\frac{1}{n} \sum_{i=1}^{n} x_i\right) - \left(\frac{1}{n} \sum_{i=1}^{n} y_i\right) $$
Step-by-Step Proof:¶
Mean of the Differences:
$$ \frac{1}{n} \sum_{i=1}^{n} (x_i - y_i) $$This can be expanded as:
$$ \frac{1}{n} \left( (x_1 - y_1) + (x_2 - y_2) + \dots + (x_n - y_n) \right) $$Which can be rewritten as:
$$ \frac{1}{n} \left( \sum_{i=1}^{n} x_i - \sum_{i=1}^{n} y_i \right) $$Factoring out the ( \frac{1}{n} ):
$$ \frac{1}{n} \sum_{i=1}^{n} x_i - \frac{1}{n} \sum_{i=1}^{n} y_i $$This is the difference of the sums divided by ( n ):
$$ \left(\frac{1}{n} \sum_{i=1}^{n} x_i\right) - \left(\frac{1}{n} \sum_{i=1}^{n} y_i\right) $$Which is exactly the difference of means.
Difference of Means:
$$ \left(\frac{1}{n} \sum_{i=1}^{n} x_i\right) - \left(\frac{1}{n} \sum_{i=1}^{n} y_i\right) $$This matches the form derived above.
3.2 Normalized response time analysis¶
In order to make an apples-to-apples comparison of whether or not the use of wrappers improves response time, we will now examine the behavior of a normalized version of it: response time divided by execution time. This value ranges from 1, when there is no wait time and all jobs in the workflow are executed immediately, to infinite, when the wait time is infinite. This normalization deals with the previous problem of scale related to the size of the workflow, allowing for better analysis.
# Create a figure with two subplots side by side, sharing the y-axis
fig, axes = plt.subplots(1, 2, figsize=(16, 6), sharey=True)
# Plot for Horizontal workflows
sns.barplot(x="wf_size", y="norm_response_time", hue="wrapped", data=joined_dataframe[joined_dataframe["wf_type"] == "H"], ax=axes[0], palette=clr_blind_palette[5:7], errorbar=("pi", 50))
axes[0].set_title("Mean Normalized Response Time by Workflow Size for Horizontal Workflows")
axes[0].set_ylabel("Normalized Response Time")
axes[0].set_xlabel("Workflow Size")
# Plot for Vertical workflows
vertical_plot = sns.barplot(x="wf_size", y="norm_response_time", hue="wrapped", data=joined_dataframe[joined_dataframe["wf_type"] == "V"], ax=axes[1], palette=clr_blind_palette[5:7], errorbar=("pi", 50))
axes[1].set_title("Mean Normalized Response Time by Workflow Size for Vertical Workflows")
axes[1].set_ylabel("")
axes[1].set_xlabel("Workflow Size")
# Annotate bars in the right plot with mean values
for container in vertical_plot.containers:
for bar in container:
height = bar.get_height()
vertical_plot.annotate(f"{height:.2f}",
xy=(bar.get_x() + bar.get_width() / 2, height),
xytext=(0, 5), # Adjust the second value for vertical offset
textcoords="offset points",
ha="center", va="bottom")
# Remove legends from individual plots
axes[0].legend_.remove()
axes[1].legend_.remove()
# Add gridlines to the y-axis
for ax in axes:
ax.yaxis.grid(True, linestyle="--", which="both", color="gray", alpha=0.7)
# Create a single legend for the entire figure at the specified position
handles, labels = axes[0].get_legend_handles_labels()
fig.legend(handles, labels, loc="center left", bbox_to_anchor=(1, 0.5), title="Wrapped")
# Adjust the layout to prevent overlap
plt.tight_layout(rect=[0, 0, 1, 1]) # Adjust rect to make space for legend
# Display the plots
plt.show()
# Print the top 5 horizontal experiments with the highest normalized response time for wrapped workflows
top_horizontal_wrapped = joined_dataframe[(joined_dataframe["wf_type"] == "H") & (joined_dataframe["wrapped"])].nlargest(5, "norm_response_time")
print(top_horizontal_wrapped[["exp", "norm_response_time"]].rename(columns={"exp": "Experiment", "norm_response_time": "Normalized Response Time Wrapped"}))
# Get the corresponding unwrapped experiment normalized response time
top_horizontal_unwrapped = top_horizontal_wrapped.copy()
top_horizontal_unwrapped["exp"] = top_horizontal_unwrapped["exp"].str.replace("W", "U")
top_horizontal_unwrapped = top_horizontal_unwrapped.merge(joined_dataframe, on="exp")
print(top_horizontal_unwrapped[["exp", "norm_response_time_y"]].rename(columns={"exp": "Experiment", "norm_response_time_y": "Normalized Response Time Unwrapped"})
)
Experiment Normalized Response Time Wrapped 35 out_14_H_d15_20_fs10_W 3.290556 31 out_14_H_d15_20_fs01_W 3.288333 33 out_14_H_d15_20_fs05_W 3.269444 105 out_20_H_d15_20_fs05_W 3.269444 251 out_8_H_d15_20_fs10_W 2.240000 Experiment Normalized Response Time Unwrapped 0 out_14_H_d15_20_fs10_U 2.036667 1 out_14_H_d15_20_fs01_U 1.424444 2 out_14_H_d15_20_fs05_U 1.168333 3 out_20_H_d15_20_fs05_U 1.303889 4 out_8_H_d15_20_fs10_U 1.121667
This plot is the same as the one in the previous section. With the normalized response time, we can now make the plots share the y-axis and make the comparison much easier. Apart from the same conclusions drawn before (vertical workflows benefit on average from wrappers), we can now confirm that the variance of horizontal workflows is much greater, and the normalized response time can worsen on average by up to 50% of the runtime (see workflow size 14); and in the worst cases, a little more than 2 times the runtime (remember that if there is no wait/queue time, the normalized runtime is 1).
# Create a figure with 2 subplots for horizontal and vertical workflows
fig, axes = plt.subplots(1, 2, figsize=(16, 6), sharey=True)
# Horizontal workflows
sns.stripplot(
x="wf_size", y="norm_response_time", hue="wrapped",
data=joined_dataframe[joined_dataframe["wf_type"] == "H"],
ax=axes[0], dodge=True, jitter=True, palette=clr_blind_palette[5:7]
)
axes[0].set_title("Normalized Response Time Distribution by Workflow Size for Horizontal Workflows")
axes[0].set_ylabel("Normalized Response Time")
axes[0].set_xlabel("Workflow Size")
# Vertical workflows
sns.stripplot(
x="wf_size", y="norm_response_time", hue="wrapped",
data=joined_dataframe[joined_dataframe["wf_type"] == "V"],
ax=axes[1], dodge=True, jitter=True, palette=clr_blind_palette[5:7]
)
axes[1].set_title("Normalized Response Time Distribution by Workflow Size for Vertical Workflows")
axes[1].set_ylabel("")
axes[1].set_xlabel("Workflow Size")
# Remove legends from individual plots
for ax in axes.flatten():
ax.legend_.remove()
# Add gridlines to the y-axis
for ax in axes.flatten():
ax.yaxis.grid(True, linestyle="--", which="both", color="gray", alpha=0.7)
# Create a single legend for the entire figure at the specified position
handles, labels = axes[0].get_legend_handles_labels()
fig.legend(handles, labels, loc="center left", bbox_to_anchor=(1, 0.5), title="Wrapped")
# Adjust the layout to prevent overlap
plt.tight_layout(rect=[0, 0, 1, 1]) # Adjust rect to make space for legend
# Display the plots
plt.show()
To see with better detail the vertical workflows, here is the same plot without sharing the y-axis.
# Create a figure with 2 subplots for horizontal and vertical workflows
fig, axes = plt.subplots(1, 2, figsize=(16, 6))
# Horizontal workflows
sns.stripplot(
x="wf_size", y="norm_response_time", hue="wrapped",
data=joined_dataframe[joined_dataframe["wf_type"] == "H"],
ax=axes[0], dodge=True, jitter=True, palette=clr_blind_palette[5:7]
)
axes[0].set_title("Normalized Response Time Distribution by Workflow Size for Horizontal Workflows")
axes[0].set_ylabel("Normalized Response Time")
axes[0].set_xlabel("Workflow Size")
# Vertical workflows
sns.stripplot(
x="wf_size", y="norm_response_time", hue="wrapped",
data=joined_dataframe[joined_dataframe["wf_type"] == "V"],
ax=axes[1], dodge=True, jitter=True, palette=clr_blind_palette[5:7]
)
axes[1].set_title("Normalized Response Time Distribution by Workflow Size for Vertical Workflows")
axes[1].set_ylabel("")
axes[1].set_xlabel("Workflow Size")
# Remove legends from individual plots
for ax in axes.flatten():
ax.legend_.remove()
# Add gridlines to the y-axis
for ax in axes.flatten():
ax.yaxis.grid(True, linestyle="--", which="both", color="gray", alpha=0.7)
# Create a single legend for the entire figure at the specified position
handles, labels = axes[0].get_legend_handles_labels()
fig.legend(handles, labels, loc="center left", bbox_to_anchor=(1, 0.5), title="Wrapped")
# Adjust the layout to prevent overlap
plt.tight_layout(rect=[0, 0, 1, 1]) # Adjust rect to make space for legend
# Display the plots
plt.show()
As mentioned above, when analyzing the normalized version of response time, we see a much larger variance in horizontal workflows compared to vertical workflows.
# Calculate mean response times for wrapped True and False
mean_norm_response_times = joined_dataframe.groupby(["wf_size", "fairshare", "wrapped", "wf_type"])["norm_response_time"].mean().reset_index()
# Pivot the data to get mean response times for wrapped True and False side by side
pivot_means = mean_norm_response_times.pivot_table(index=["wf_size", "fairshare", "wf_type"], columns="wrapped", values="norm_response_time").reset_index()
# Calculate the difference between wrapped and unwrapped workflows
pivot_means["difference"] = pivot_means[True] - pivot_means[False]
# Create a figure with two subplots side by side
fig, axes = plt.subplots(1, 2, figsize=(16, 6), sharey=True)
# Add gridlines to the y-axis
for ax in axes:
ax.yaxis.grid(True, linestyle="--", which="both", color="gray", alpha=0.7)
# Plot for Horizontal workflows
sns.barplot(x="wf_size", y="difference", hue="fairshare", data=pivot_means[pivot_means["wf_type"] == "H"], ax=axes[0], palette=clr_blind_palette[5:8])
axes[0].set_title("Mean Difference of Wrapped and Unwrapped Norm. Response Time for Horizontal Workflows")
axes[0].set_ylabel("Difference of Mean Normalized Response Time (Wrapped - Unwrapped)")
axes[0].set_xlabel("Workflow Size")
# Plot for Vertical workflows
sns.barplot(x="wf_size", y="difference", hue="fairshare", data=pivot_means[pivot_means["wf_type"] == "V"], ax=axes[1], palette=clr_blind_palette[5:8])
axes[1].set_title("Mean Difference of Wrapped and Unwrapped Norm. Response Time for Vertical Workflows")
axes[1].set_ylabel("")
axes[1].set_xlabel("Workflow Size")
# Remove legends from individual plots
axes[0].legend_.remove()
axes[1].legend_.remove()
# Create a single legend for the entire figure at the specified position
handles, labels = axes[0].get_legend_handles_labels()
fig.legend(handles, labels, loc="center left", bbox_to_anchor=(1, 0.5), title="Fairshare")
# Adjust the layout to prevent overlap
plt.tight_layout(rect=[0, 0, 1, 1]) # Adjust rect to make space for legend
# Display the plots
plt.show()
Analyzing this graph, now with normalized response time, we see that the increasing difference in vertical workflows with respect to size is because the larger the size, the more queue time you save by aggregating the workflow. We can see that while it is true that wrappers improve the most when the fair share is small, the improvement is proportional to the size of the workflows. However, this does not contradict the hypothesis that when the workflow is larger, the wrapper is most likely to save more time.
With respect to the horizontal workflows, since the execution time is constant (given that the workflow is parallelized), the plot is exactly the same (proportionally).
# Define a function to create the overlying histograms for a given wf_type
def plot_overlying_histograms(joined_dataframe):
wf_types = ["H", "V"]
titles = ["Horizontal", "Vertical"]
# Create a figure with two subplots side by side
fig, axs = plt.subplots(1, 2, figsize=(16, 6), sharey=True)
for i, wf_type_value in enumerate(wf_types):
# Filter the dataframe based on wf_type
filtered_df = joined_dataframe[joined_dataframe["wf_type"] == wf_type_value]
# Calculate common bin edges
min_value = filtered_df["norm_response_time"].min()
max_value = filtered_df["norm_response_time"].max()
bins = 20
bin_edges = np.linspace(min_value, max_value, bins + 1)
# Plot histogram for wrapped workflows
sns.histplot(data=filtered_df[filtered_df["wrapped"] == True],
x="norm_response_time",
color=clr_blind_palette[6],
label="Wrapped" if i == 0 else "", # Add label only for the first plot
kde=False,
bins=bin_edges,
stat="percent",
element="step",
fill=True,
alpha=0.4,
ax=axs[i])
# Plot histogram for unwrapped workflows
sns.histplot(data=filtered_df[filtered_df["wrapped"] == False],
x="norm_response_time",
color=clr_blind_palette[5],
label="Unwrapped" if i == 0 else "", # Add label only for the first plot
kde=False,
bins=bin_edges,
stat="percent",
element="step",
fill=True,
alpha=0.4,
ax=axs[i])
# Set plot title and labels
axs[i].set_title(f"Normalized Response Time Distribution for {titles[i]} Workflows")
axs[i].set_xlabel("Normalized Response Time")
axs[0].set_ylabel("Proportion")
# Create a single legend for both plots
handles, labels = axs[0].get_legend_handles_labels()
fig.legend(handles, labels, loc="upper right", bbox_to_anchor=(1.10, 0.6))
# Show the plot
plt.tight_layout()
plt.show()
# Generate the plots for both wf_type "H" and "V"
plot_overlying_histograms(joined_dataframe)
This plot shows the (binned) distributions of the normalized response time for wrapped and unwrapped and for both horizontal and vertical workflows. We can see that for horizontal workflows, the distributions of wrapped and unwrapped are quite similar. For vertical workflows, however, we observe that the distribution for wrapped is much more compressed toward 1 (the best possible normalized response time), meaning that wrapped vertical workflows tend to have smaller normalized response times.
We can see the same thing in the following KDE plot of the distributions. A kernel density estimate (KDE) plot is a way to visualize the distribution of observations in a dataframe, similar to a histogram. A KDE represents the data as a continuous probability density curve in one or more dimensions. Although it is an estimate (which may contain distortions if the underlying distribution is bounded or not smooth), it helps us see the overall behavior of the job being analyzed.
# Define a function to create the overlying KDE plots for a given wf_type
def plot_overlying_kde(joined_dataframe):
wf_types = ["H", "V"]
titles = ["Horizontal", "Vertical"]
# Create a figure with two subplots side by side
fig, axs = plt.subplots(1, 2, figsize=(16, 6), sharex=True, sharey=True)
for i, wf_type_value in enumerate(wf_types):
# Filter the dataframe based on wf_type
filtered_df = joined_dataframe[joined_dataframe["wf_type"] == wf_type_value]
# Plot KDE for wrapped workflows
sns.kdeplot(data=filtered_df[filtered_df["wrapped"] == True],
x="norm_response_time",
color=clr_blind_palette[6],
label="Wrapped" if i == 0 else "", # Add label only for the first plot
fill=True,
alpha=0.4,
ax=axs[i])
# Plot KDE for unwrapped workflows
sns.kdeplot(data=filtered_df[filtered_df["wrapped"] == False],
x="norm_response_time",
color=clr_blind_palette[5],
label="Unwrapped" if i == 0 else "", # Add label only for the first plot
fill=True,
alpha=0.4,
ax=axs[i])
# Set plot title and labels
axs[i].set_title(f"Normalized Response Time Distribution for {titles[i]} Workflows")
axs[i].set_xlabel("Normalized Response Time")
axs[0].set_ylabel("Density")
# Create a single legend for both plots
handles, labels = axs[0].get_legend_handles_labels()
fig.legend(handles, labels, loc="upper right", bbox_to_anchor=(1.10, 0.6))
# Show the plot
plt.tight_layout()
plt.show()
# Generate the plots for both wf_type "H" and "V"
plot_overlying_kde(joined_dataframe)
In the following plot, we will compare the normalized response time 1 to 1 across all experiments (wrapped to unwrapped) to see the result of these experiments in more detail. In addition, we will add a line chart to each subplot that shows the percentage utilization (in CPUs) of the machine at the time the workflows were sent; there will be one line for the wrapped and one line for the unwrapped workflow submission times to differentiate them.
# Dataframe specific for the two-value lollipop plot, we only need "exp_general", "norm_response_time", "wrapped", "wf_type", "wf_size" and "norm_cpus_in_use"
lollipop_df = joined_dataframe[["exp_general", "norm_response_time", "wrapped", "wf_type", "wf_size", 'norm_cpus_in_use']].copy()
lollipop_df.head()
exp_general | norm_response_time | wrapped | wf_type | wf_size | norm_cpus_in_use | |
---|---|---|---|---|---|---|
0 | out_14_H_d14_10_fs01_ | 1.028889 | False | H | 14 | 0.972394 |
1 | out_14_H_d14_10_fs01_ | 1.638889 | True | H | 14 | 0.992970 |
2 | out_14_H_d14_10_fs05_ | 1.041667 | False | H | 14 | 0.990912 |
3 | out_14_H_d14_10_fs05_ | 1.000000 | True | H | 14 | 0.983368 |
4 | out_14_H_d14_10_fs10_ | 1.072778 | False | H | 14 | 0.998457 |
# Define the workflow sizes and types
wf_sizes = [2, 8, 14, 20]
wf_types = ["H", "V"]
# Create a figure with 4 rows and 2 columns of subplots, with larger width
fig, axes = plt.subplots(4, 2, figsize=(24, 18)) # Increased figsize for overall plot size
# Initialize a list to store handles and labels for the secondary legend
secondary_handles_labels = []
# Define a function to create lollipop plots
def create_lollipop_plot(ax, data, wf_size, wf_type):
# Pivot the data to get wrapped and unwrapped values side by side
pivot_data = data.pivot(index="exp_general", columns="wrapped", values="norm_response_time")
pivot_data.columns = ["Unwrapped", "Wrapped"] # Rename columns for clarity
pivot_data = pivot_data.dropna() # Drop any rows with NaN values
# The horizontal plot is made using the vlines function
ax.vlines(x=pivot_data.index, ymin=pivot_data["Unwrapped"], ymax=pivot_data["Wrapped"], color="grey", alpha=0.4)
ax.scatter(pivot_data.index, pivot_data["Unwrapped"], color=clr_blind_palette[5], alpha=1, label="Unwrapped")
ax.scatter(pivot_data.index, pivot_data["Wrapped"], color=clr_blind_palette[6], alpha=0.6, label="Wrapped")
# Rotate x-axis labels for better readability
ax.set_xticks(range(len(pivot_data.index)))
ax.set_xticklabels(pivot_data.index, rotation=90)
# Add title and axis names
title = "Horizontal" if wf_type == "H" else "Vertical"
ax.set_title(f"{title} Workflows - Workflow Size {wf_size}", loc="left")
ax.set_xlabel("Experiment")
if ax in axes[:, 0]: # Check if the axis is in the first column
ax.set_ylabel("Normalized Response Time")
# Create a secondary y-axis
ax2 = ax.twinx()
# Plot the norm_cpus_in_use on the secondary y-axis with reduced opacity
for wrapped_status, color in zip([0, 1], [clr_blind_palette[4], clr_blind_palette[7]]):
filtered_data = data[data['wrapped'] == wrapped_status]
line, = ax2.plot(filtered_data['exp_general'], filtered_data['norm_cpus_in_use'],
color=color, marker='s', linestyle='-', alpha=0.8, # Set alpha for transparency
label=f'{"Wrapped" if wrapped_status else "Unwrapped"} Norm. CPUs in Use')
# Collect the handles and labels for the secondary legend
secondary_handles_labels.append((line, line.get_label()))
# Set the y-axis label for the secondary axis only for the right column
if col == 1:
ax2.set_ylabel('CPU Usage Propotion')
# Plot for each combination of wf_type and wf_size
for row, wf_size in enumerate(wf_sizes):
for col, wf_type in enumerate(wf_types):
# Filter the data based on wf_type and wf_size
filtered_data = lollipop_df[(lollipop_df["wf_type"] == wf_type) & (lollipop_df["wf_size"] == wf_size)]
# Create the lollipop plot if there is data for the given combination
if not filtered_data.empty:
create_lollipop_plot(axes[row, col], filtered_data, wf_size, wf_type)
else:
axes[row, col].set_visible(False)
# Create a single legend for the entire figure for the lollipop plot
handles, labels = axes[0, 0].get_legend_handles_labels()
fig.legend(handles, labels, loc="upper right", bbox_to_anchor=(1.014, 0.57), title="Workflow Type")
# Create the secondary legend manually
secondary_handles, secondary_labels = zip(*secondary_handles_labels) # Unzip handles and labels
fig.legend(secondary_handles[:2], secondary_labels[:2], loc="upper right", bbox_to_anchor=(1.07, 0.5), title="CPU Usage")
# Adjust layout and add padding for labels and titles
plt.subplots_adjust(wspace=0.3, hspace=0.4) # Increase space between plots
plt.tight_layout(rect=[0, 0, 0.95, 0.95]) # Adjust rect to make space for the legend
# Show the plot
plt.show()
As we have seen throughout this notebook, vertical workflows almost always benefit from using wrappers, even if the improvement may seem small. On the other hand, wrapping horizontal workflows can make the normalized response time worse in many cases, sometimes much worse (and a few times much better). This highlights how volatile wrapping a vertical workflow can be, and encourages getting a much larger amount of data to do a deeper study on the subject.
It is important to note that in the previous plot, the y-axis is not shared across rows, so next we compute the same plot but share the y-axis to illustrate the high variability of horizontal workflows compared to vertical workflows.
# Define the workflow sizes and types
wf_sizes = [2, 8, 14, 20]
wf_types = ["H", "V"]
# Create a figure with 4 rows and 2 columns of subplots, with larger width
fig, axes = plt.subplots(4, 2, figsize=(24, 18), sharey=True) # Increased figsize for overall plot size
# Initialize a list to store handles and labels for the secondary legend
secondary_handles_labels = []
# Define a function to create lollipop plots
def create_lollipop_plot(ax, data, wf_size, wf_type):
# Pivot the data to get wrapped and unwrapped values side by side
pivot_data = data.pivot(index="exp_general", columns="wrapped", values="norm_response_time")
pivot_data.columns = ["Unwrapped", "Wrapped"] # Rename columns for clarity
pivot_data = pivot_data.dropna() # Drop any rows with NaN values
# The horizontal plot is made using the vlines function
ax.vlines(x=pivot_data.index, ymin=pivot_data["Unwrapped"], ymax=pivot_data["Wrapped"], color="grey", alpha=0.4)
ax.scatter(pivot_data.index, pivot_data["Unwrapped"], color=clr_blind_palette[5], alpha=1, label="Unwrapped")
ax.scatter(pivot_data.index, pivot_data["Wrapped"], color=clr_blind_palette[6], alpha=0.6, label="Wrapped")
# Rotate x-axis labels for better readability
ax.set_xticks(range(len(pivot_data.index)))
ax.set_xticklabels(pivot_data.index, rotation=90)
# Add title and axis names
title = "Horizontal" if wf_type == "H" else "Vertical"
ax.set_title(f"{title} Workflows - Workflow Size {wf_size}", loc="left")
ax.set_xlabel("Experiment")
if ax in axes[:, 0]: # Check if the axis is in the first column
ax.set_ylabel("Normalized Response Time")
# Create a secondary y-axis
ax2 = ax.twinx()
# Plot the norm_cpus_in_use on the secondary y-axis with reduced opacity
for wrapped_status, color in zip([0, 1], [clr_blind_palette[4], clr_blind_palette[7]]):
filtered_data = data[data['wrapped'] == wrapped_status]
line, = ax2.plot(filtered_data['exp_general'], filtered_data['norm_cpus_in_use'],
color=color, marker='s', linestyle='-', alpha=0.8, # Set alpha for transparency
label=f'{"Wrapped" if wrapped_status else "Unwrapped"} Norm. CPUs in Use')
# Collect the handles and labels for the secondary legend
secondary_handles_labels.append((line, line.get_label()))
# Set the y-axis label for the secondary axis only for the right column
if col == 1:
ax2.set_ylabel('CPU Usage Propotion')
# Plot for each combination of wf_type and wf_size
for row, wf_size in enumerate(wf_sizes):
for col, wf_type in enumerate(wf_types):
# Filter the data based on wf_type and wf_size
filtered_data = lollipop_df[(lollipop_df["wf_type"] == wf_type) & (lollipop_df["wf_size"] == wf_size)]
# Create the lollipop plot if there is data for the given combination
if not filtered_data.empty:
create_lollipop_plot(axes[row, col], filtered_data, wf_size, wf_type)
else:
axes[row, col].set_visible(False)
# Create a single legend for the entire figure for the lollipop plot
handles, labels = axes[0, 0].get_legend_handles_labels()
fig.legend(handles, labels, loc="upper right", bbox_to_anchor=(1.014, 0.57), title="Workflow Type")
# Create the secondary legend manually
secondary_handles, secondary_labels = zip(*secondary_handles_labels) # Unzip handles and labels
fig.legend(secondary_handles[:2], secondary_labels[:2], loc="upper right", bbox_to_anchor=(1.07, 0.5), title="CPU Usage")
# Adjust layout and add padding for labels and titles
plt.subplots_adjust(wspace=0.3, hspace=0.4) # Increase space between plots
plt.tight_layout(rect=[0, 0, 0.95, 0.95]) # Adjust rect to make space for the legend
# Show the plot
plt.show()
As explained, now that the plots share the axis, it is much easier to grasp the large difference in variability that horizontal workflows suffer compared to vertical workflows. It is interesting to observe that for horizontal workflows, the experiments with a large difference in normalized response time between wrapped and unwrapped normalized response time typically (with some exceptions) correspond to submit moments with very high machine usage (in both wrapped and unwrapped cases).
3.3 Speedup Analysis¶
The speedup is calculated by dividing the response time of an unwrapped experiment by the response time of the same experiment but wrapped. This measure helps us determine how much a workflow benefits (or suffers) from being aggregated with wrappers.
# Create a dataframe containing the speedup for every "exp_general" value, that is the reponse time of the unwrapped workflow divided
# by the response time of the wrapped workflow for the same experiment. The dataframe should also contain the "wf_type", "wf_size" and "fairshare" columns.
speedup_df = joined_dataframe[joined_dataframe["wrapped"] == False].set_index(["exp_general", "wf_type", "wf_size", "fairshare"])["response_time"].rename("unwrapped_time").to_frame()
speedup_df["wrapped_time"] = joined_dataframe[joined_dataframe["wrapped"] == True].set_index(["exp_general", "wf_type", "wf_size", "fairshare"])["response_time"]
speedup_df["speedup"] = speedup_df["unwrapped_time"] / speedup_df["wrapped_time"]
speedup_df = speedup_df.reset_index()
speedup_df.head()
exp_general | wf_type | wf_size | fairshare | unwrapped_time | wrapped_time | speedup | |
---|---|---|---|---|---|---|---|
0 | out_14_H_d14_10_fs01_ | H | 14 | 0.1 | 1852.0 | 2950.0 | 0.627797 |
1 | out_14_H_d14_10_fs05_ | H | 14 | 0.5 | 1875.0 | 1800.0 | 1.041667 |
2 | out_14_H_d14_10_fs10_ | H | 14 | 1.0 | 1931.0 | 1887.0 | 1.023317 |
3 | out_14_H_d14_15_fs01_ | H | 14 | 0.1 | 1828.0 | 1828.0 | 1.000000 |
4 | out_14_H_d14_15_fs05_ | H | 14 | 0.5 | 1800.0 | 1826.0 | 0.985761 |
We will start with a bar plot similar to the ones we saw before. The plots will have a red line at speedup 1, indicating that the experiments represented by bars above this line have improved (on average), while the experiments represented by bars below the line or right at the line have worsened or stayed the same when wrapped (again, on average).
# Create two barplots side by side, one for Horizontal workflows and one for Vertical workflows, showing the speedup for each "wf_size" and "fairshare" value. And with a red line at speedup = 1.
# Create a figure with two subplots side by side
fig, axes = plt.subplots(1, 2, figsize=(16, 6), sharey=True)
# Plot for Horizontal workflows
sns.barplot(x="wf_size", y="speedup", hue="fairshare", data=speedup_df[speedup_df["wf_type"] == "H"], ax=axes[0], palette=clr_blind_palette[5:8], errorbar=("pi", 50))
axes[0].set_title("Mean Speedup for Horizontal Workflows")
axes[0].set_ylabel("Speedup")
axes[0].set_xlabel("Workflow Size")
axes[0].axhline(y=1, color="r", linestyle="--")
# Plot for Vertical workflows
sns.barplot(x="wf_size", y="speedup", hue="fairshare", data=speedup_df[speedup_df["wf_type"] == "V"], ax=axes[1], palette=clr_blind_palette[5:8], errorbar=("pi", 50))
axes[1].set_title("Mean Speedup for Vertical Workflows")
axes[1].set_ylabel("Speedup")
axes[1].set_xlabel("Workflow Size")
axes[1].axhline(y=1, color="r", linestyle="--")
# Remove legends from individual plots
axes[0].legend_.remove()
axes[1].legend_.remove()
# Add gridlines to the y-axis
for ax in axes:
ax.yaxis.grid(True, linestyle="--", which="both", color="gray", alpha=0.7)
# Create a single legend for the entire figure at the specified position
handles, labels = axes[0].get_legend_handles_labels()
fig.legend(handles, labels, loc="center left", bbox_to_anchor=(1, 0.5), title="Fairshare")
# Adjust the layout to prevent overlap
plt.tight_layout(rect=[0, 0, 1, 1]) # Adjust rect to make space for legend
# Display the plots
plt.show()
As we can see, all of the vertical workflow groups fall above the line or, in the worst cases, right on the line. Meanwhile, for horizontal workflows, a third of the bars fall above the line. This again reinforces the fact that vertical workflows mostly benefit from wrappers and horizontal workflows mostly suffer from them.
# Create a new variable that contains "Better", "Same" or "Worse" depending on the value of the "speedup" column.
speedup_df["speedup_category"] = np.where(speedup_df["speedup"] > 1, "Better", np.where(speedup_df["speedup"] == 1, "Same", "Worse"))
speedup_df.head()
exp_general | wf_type | wf_size | fairshare | unwrapped_time | wrapped_time | speedup | speedup_category | |
---|---|---|---|---|---|---|---|---|
0 | out_14_H_d14_10_fs01_ | H | 14 | 0.1 | 1852.0 | 2950.0 | 0.627797 | Worse |
1 | out_14_H_d14_10_fs05_ | H | 14 | 0.5 | 1875.0 | 1800.0 | 1.041667 | Better |
2 | out_14_H_d14_10_fs10_ | H | 14 | 1.0 | 1931.0 | 1887.0 | 1.023317 | Better |
3 | out_14_H_d14_15_fs01_ | H | 14 | 0.1 | 1828.0 | 1828.0 | 1.000000 | Same |
4 | out_14_H_d14_15_fs05_ | H | 14 | 0.5 | 1800.0 | 1826.0 | 0.985761 | Worse |
To see the big picture of the analysis, the following graph shows the percentages of "Better," "Same," or "Worse" results from applying wrappers for both vertical and horizontal workflows.
# Define the order of categories
category_order = ["Better", "Same", "Worse"]
# Create a figure with two subplots side by side, sharing the y-axis
fig, axes = plt.subplots(1, 2, figsize=(16, 6), sharey=True)
# Plot for Horizontal workflows
sns.countplot(x="speedup_category", stat="percent", data=speedup_df[speedup_df["wf_type"] == "H"], ax=axes[0], order=category_order)
axes[0].set_title("Result Percentages of Applying Wrappers for Horizontal Workflows")
axes[0].set_ylabel("Percentage")
axes[0].set_xlabel("")
# Plot for Vertical workflows
sns.countplot(x="speedup_category", stat="percent", data=speedup_df[speedup_df["wf_type"] == "V"], ax=axes[1], order=category_order)
axes[1].set_title("Result Percentages of Applying Wrappers for Vertical Workflows")
axes[1].set_ylabel("")
axes[1].set_xlabel("Speedup Category")
# Add percentages on top of the bars
for ax in axes:
total = len(speedup_df[speedup_df["wf_type"] == ("H" if ax == axes[0] else "V")])
for p in ax.patches:
percentage = p.get_height()
ax.annotate(f"{percentage:.1f}%", (p.get_x() + p.get_width() / 2., percentage),
ha="center", va="center", xytext=(0, 10), textcoords="offset points")
# Remove labels from individual plots
axes[0].set_xlabel("")
axes[1].set_xlabel("")
# Adjust the layout to prevent overlap
plt.tight_layout(rect=[0, 0, 1, 1])
# Display the plots
plt.show()
As we can see, in very few cases does the use of wrappers slow down the execution of vertical workflows, while this seems to be the rule for horizontal workflows. In the following graph, we break these results down first by fair share value and then by workflow size to see if they have a clear impact on these results.
# Create a figure with two subplots side by side
fig, axes = plt.subplots(1, 2, figsize=(16, 6), sharey=True)
# Plot for Horizontal workflows
sns.countplot(x="speedup_category", data=speedup_df[speedup_df["wf_type"] == "H"], ax=axes[0], stat="percent", hue="fairshare", palette=clr_blind_palette[5:8], order=category_order)
axes[0].set_title("Result Percentages of applying wrappers for Horizontal Workflows by Fairshare")
axes[0].set_ylabel("Percentage")
# Plot for Vertical workflows
sns.countplot(x="speedup_category", data=speedup_df[speedup_df["wf_type"] == "V"], ax=axes[1], stat="percent", hue="fairshare", palette=clr_blind_palette[5:8], order=category_order)
axes[1].set_title("Result Percentages of applying wrappers for Vertical Workflows by Fairshare")
axes[1].set_ylabel("")
axes[1].set_xlabel("Speedup Category")
# Adjust the layout to prevent overlap
plt.tight_layout(rect=[0, 0, 1, 1])
# Add gridlines to the y-axis
for ax in axes:
ax.yaxis.grid(True, linestyle="--", which="both", color="gray", alpha=0.7)
# Add percentages on top of the bars
for ax in axes:
total = len(speedup_df[speedup_df["wf_type"] == ("H" if ax == axes[0] else "V")])
for p in ax.patches:
percentage = p.get_height()
if percentage > 0:
ax.annotate(f"{percentage:.1f}%", (p.get_x() + p.get_width() / 2., percentage),
ha="center", va="center", xytext=(0, 10), textcoords="offset points")
# Remove legend from individual plots
axes[0].legend_.remove()
axes[1].legend_.remove()
# Remove labels from individual plots
axes[0].set_xlabel("")
axes[1].set_xlabel("")
# Add a legend to the figure
handles, labels = axes[0].get_legend_handles_labels()
fig.legend(handles, labels, loc="center left", bbox_to_anchor=(1, 0.5), title="Fairshare")
# Display the plots
plt.show()
We can observe that, as for the cases of improvement, there is a trend showing that there is a higher percentage of these cases for the smaller fairhsare values. There is the opposite trend for the cases where the wrapper has no impact (neither positive nor negative) on the speedup. For the cases that degrade, there seems to be a trend of higher percentages for lower fair share values for horizontal workflows, while for vertical workflows there isn't a clear relationship regarding fairshare.
# Create a barplot with the counts of the "speedup_category" for horizontal and one for vertical workflows.
# Create a figure with two subplots side by side
fig, axes = plt.subplots(1, 2, figsize=(16, 6), sharey=True)
# Plot for Horizontal workflows
sns.countplot(x="speedup_category", data=speedup_df[speedup_df["wf_type"] == "H"], ax=axes[0], stat="percent", hue="wf_size", palette=clr_blind_palette[4:8], order=category_order)
axes[0].set_title("Result Percentages of applying wrappers for Horizontal Workflows by Workflow Size")
axes[0].set_ylabel("Percentage")
# Plot for Vertical workflows
sns.countplot(x="speedup_category", data=speedup_df[speedup_df["wf_type"] == "V"], ax=axes[1], stat="percent", hue="wf_size", palette=clr_blind_palette[4:8], order=category_order)
axes[1].set_title("Result Percentages of applying wrappers for Vertical Workflows by Workflow Size")
axes[1].set_ylabel("")
axes[1].set_xlabel("Speedup Category")
# Adjust the layout to prevent overlap
plt.tight_layout(rect=[0, 0, 1, 1])
# Add gridlines to the y-axis
for ax in axes:
ax.yaxis.grid(True, linestyle="--", which="both", color="gray", alpha=0.7)
# Add percentages on top of the bars
for ax in axes:
total = len(speedup_df[speedup_df["wf_type"] == ("H" if ax == axes[0] else "V")])
for p in ax.patches:
percentage = p.get_height()
if percentage > 0:
ax.annotate(f"{percentage:.1f}%", (p.get_x() + p.get_width() / 2., percentage),
ha="center", va="center", xytext=(0, 10), textcoords="offset points")
# Remove legend from individual plots
axes[0].legend_.remove()
axes[1].legend_.remove()
# Remove labels from individual plots
axes[0].set_xlabel("")
axes[1].set_xlabel("")
# Add a legend to the figure
handles, labels = axes[0].get_legend_handles_labels()
fig.legend(handles, labels, loc="center left", bbox_to_anchor=(1, 0.5), title="Workflow Size")
# Display the plots
plt.show()
In terms of workflow size, we don't see any clear trends, except for the vertical workflows that get better response time with the use of wrappers, where the larger the workflows, the larger the percentage of wrappers.
Let's now compute heatmaps for vertical workflows, since they are more interesting, and see how likely it is that the response time will improve (or stay the same) when wrapping. In addition, we will also see how small the speedup is below 1 in the cases where it is below 1.
# Filter by wf_type
filtered_df = speedup_df[speedup_df['wf_type'] == 'V']
# Calculate the percentage of "Better" for each combination of wf_size and fairshare
percentage_better = (filtered_df[filtered_df['speedup_category'] == 'Better']
.groupby(['wf_size', 'fairshare'])
.size() / filtered_df.groupby(['wf_size', 'fairshare']).size() * 100).unstack()
# Sort the axes
percentage_better = percentage_better.sort_index(axis=0).sort_index(axis=1, ascending=False)
# Create the heatmap with wf_size on the x-axis and fairshare on the y-axis
plt.figure(figsize=(10, 8))
sns.heatmap(percentage_better.T, annot=True, fmt=".1f", vmin=0, vmax=100, cmap="coolwarm", cbar_kws={'label': 'Percentage of Better Speedup'}, linewidths=0.5)
plt.title(f"Heatmap of Percentage of Better Speedup for Vertical Workflows")
plt.xlabel('wf_size')
plt.ylabel('fairshare')
plt.show()
# Filter by wf_type
filtered_df = speedup_df[speedup_df['wf_type'] == 'V']
# Calculate the percentage of "Same" or "Better" for each combination of wf_size and fairshare
percentage_same_or_better = (filtered_df[(filtered_df['speedup_category'] == 'Better') | (filtered_df['speedup_category'] == 'Same')]
.groupby(['wf_size', 'fairshare'])
.size() / filtered_df.groupby(['wf_size', 'fairshare']).size() * 100).unstack()
# Sort the axes
percentage_same_or_better = percentage_same_or_better.sort_index(axis=0).sort_index(axis=1, ascending=False)
# Create the heatmap with wf_size on the x-axis and fairshare on the y-axis
plt.figure(figsize=(10, 8))
sns.heatmap(percentage_same_or_better.T, annot=True, fmt=".1f", vmin=0, vmax=100, cmap="coolwarm", cbar_kws={'label': 'Percentage of Same or Better Speedup'}, linewidths=0.5)
plt.title(f"Heatmap of Percentage of Same or Better Speedup for Vertical Workflows")
plt.xlabel('wf_size')
plt.ylabel('fairshare')
plt.show()
As we can see, the vast majority of vertical workflows get better or stay the same response time with wrappers. And as the following graph shows, the ones that get worse are mostly very close to 1.
# Filter the data for vertical workflows that are worse
filtered_df = speedup_df[(speedup_df['wf_type'] == 'V') & (speedup_df['speedup_category'] == 'Worse')]
# Create a scatterplot with the fairshare as the hue and wf_size as the style
plt.figure(figsize=(12, 6))
sns.scatterplot(x='exp_general', y='speedup', hue='fairshare', style='wf_size', data=filtered_df, palette=clr_blind_palette[5:8], s=100)
# Add gridlines to the y-axis
plt.gca().yaxis.grid(True, linestyle="--", which="both", color="gray", alpha=0.7)
plt.gca().set_xticklabels([]) # Remove x-axis labels
# Add title and labels
plt.title('Speedup Values for Vertical Workflows that are Worse')
plt.xlabel('Experiment')
plt.ylabel('Speedup')
# Adjust the layout to prevent overlap
plt.tight_layout(rect=[0, 0, 0.85, 1]) # Adjust rect to make space for legend
# Show the plot with the legend on the right, centered vertically
plt.legend(title='Fairshare', bbox_to_anchor=(1.05, 0.5), loc='center left', borderaxespad=0.)
plt.show()
The last graph in this section shows the overlapping distribution of speedup values for horizontal and vertical workflows.
# Define a function to create overlaid histograms for speedup values by wf_type
def plot_overlaid_speedup_histograms(speedup_df):
wf_types = ["H", "V"]
labels = ["Horizontal", "Vertical"]
colors = [clr_blind_palette[5], clr_blind_palette[6]]
# Define the number of bins
bins = 20
# Calculate the common bin edges
min_value = speedup_df["speedup"].min()
max_value = speedup_df["speedup"].max()
bin_edges = np.linspace(min_value, max_value, bins + 1)
# Create a figure
fig, ax = plt.subplots(figsize=(10, 6))
# Plot histograms for each workflow type (H and V) using the common bin edges
for wf_type_value, label, color in zip(wf_types, labels, colors):
# Filter the dataframe based on wf_type
filtered_df = speedup_df[speedup_df["wf_type"] == wf_type_value]
# Plot histogram
sns.histplot(data=filtered_df,
x="speedup",
color=color,
label=label,
kde=False,
bins=bin_edges, # Use the common bin edges
stat="percent",
element="step",
fill=True,
alpha=0.4,
ax=ax)
# Set plot title and labels
ax.set_title("Speedup Distribution for Horizontal and Vertical Workflows")
ax.set_xlabel("Speedup")
ax.set_ylabel("Proportion")
# Create a legend
ax.legend(title="Workflow Type")
# Show the plot
plt.tight_layout()
plt.show()
# Generate the plot for speedup distributions
plot_overlaid_speedup_histograms(speedup_df)
We can see that vertical workflows cluster tightly around 1, which means that, as concluded in other sections, the vast majority of these workflows benefit from wrappers. Although this benefit may seem small at first glance, given the simplicity of the method (wrapping workflows) and the size and resources required by typical climate study workflows, even a speedup slightly above 1 could imply a large saving of resources. On the other hand, this plot shows us the higher volatility of horizontal workflows when aggregated: many things can happen, while most of them are worsened or not affected by wrappers, some of them can benefit drastically from their use.
The following plot helps us to visualize the KDE speedup distribution for vertical and horizontal workflows in a smoother way.
# Define a function to create overlaid KDE plots for speedup values by wf_type
def plot_overlaid_speedup_kde(speedup_df):
wf_types = ["H", "V"]
labels = ["Horizontal", "Vertical"]
colors = [clr_blind_palette[5], clr_blind_palette[6]]
# Create a figure
fig, ax = plt.subplots(figsize=(10, 6))
# Plot KDEs for each workflow type (H and V)
for wf_type_value, label, color in zip(wf_types, labels, colors):
# Filter the dataframe based on wf_type
filtered_df = speedup_df[speedup_df["wf_type"] == wf_type_value]
# Plot KDE
sns.kdeplot(data=filtered_df,
x="speedup",
color=color,
label=label,
fill=True,
ax=ax)
# Set plot title and labels
ax.set_title("Speedup Distribution for Horizontal and Vertical Workflows")
ax.set_xlabel("Speedup")
ax.set_ylabel("Density")
# Create a legend
ax.legend(title="Workflow Type")
# Show the plot
plt.tight_layout()
plt.show()
# Generate the plot for speedup distributions
plot_overlaid_speedup_kde(speedup_df)
4. Feature Importance Analysis¶
One of the goals of this project was to determine which variables play an important role in the response time required by a workflow. In the previous plot-based analysis, we already found some trends regarding the share and size of workflows, but the intention of this section is to analyze these and other variables in more detail.
Two different regression models were trained and a correlation analysis was performed. The common target variable in all cases is the normalized response time, and the selected features will be explained later.
In addition, the analysis is separated by size to avoid extracting an obvious implication related to size: the larger the size of the workflow, the more time it is likely to spend in the queue, which also makes the normalized response time larger. We know this in advance, and we decided to separate by size to focus on other possible insights.
# Create a figure with 4 subplots in a 1x4 grid, sharing both x and y axes
fig, axes = plt.subplots(1, 4, figsize=(24, 6), sharex=True, sharey=True)
# Define the number of bins for all histograms
num_bins = 10
# Plot histograms for each workflow size
for i, wf_size in enumerate(wf_sizes):
# Filter the data based on wf_size
filtered_data = joined_dataframe[joined_dataframe["wf_size"] == wf_size]
# Plot histogram for the whole distribution (no separation by wrapped/unwrapped)
sns.histplot(data=filtered_data,
x="norm_response_time",
color=clr_blind_palette[6],
kde=False,
bins=num_bins, # Specify the same number of bins
stat="percent",
element="step",
fill=True,
ax=axes[i])
# Set plot title
axes[i].set_title(f"Workflow Size {wf_size}")
# Adjust the layout to prevent overlap
plt.tight_layout(rect=[0, 0, 1, 1])
# Display the plots
plt.show()
# Create a figure with 4 subplots in a 1x4 grid, sharing both x and y axes
fig, axes = plt.subplots(1, 4, figsize=(24, 6), sharex=True, sharey=True)
# Plot KDEs for each workflow size
for i, wf_size in enumerate(wf_sizes):
# Filter the data based on wf_size
filtered_data = joined_dataframe[joined_dataframe["wf_size"] == wf_size]
# Plot KDE for the whole distribution (no separation by wrapped/unwrapped)
sns.kdeplot(data=filtered_data,
x="norm_response_time",
color=clr_blind_palette[6],
fill=True,
ax=axes[i])
# Set plot title
axes[i].set_title(f"Workflow Size {wf_size}")
# Adjust the layout to prevent overlap
plt.tight_layout(rect=[0, 0, 1, 1])
# Display the plots
plt.show()
As we can see, the distribution of the normalized response time becomes slightly more right-skewed as the size increases. Note that the second plot is again a KDE, so it is only an estimate.
Since we are separating by size, the total amount of data for each size is reduced to 288/4 = 72 samples, which means that if we use 80% of the data for training, each model will be trained with about 58 samples. This is a very limited amount of data from which to draw reliable conclusions, so the following results will only give us a general idea, which may change if more data is used.
4.1 Random Forest Regressor¶
The Random Forest, a commonly used machine learning algorithm that combines the output of multiple decision treees to produce a single result, uses the Gini Importance, also known as Mean Decrease Impurity. It calculates feature importance as the total decrease in node impurities from splitting on the feature, averaged over all trees in the model.
# Additional imports for statistical analysis
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import mean_absolute_percentage_error
from sklearn.model_selection import train_test_split
# Print types of the dataframe
print(joined_dataframe.dtypes)
exp object SubmitTime float64 StartTime float64 EndTime float64 Priority float64 backfill_pct float64 cpus_in_use float64 cpus_requested float64 wait_time float64 execution_time float64 time float64 ProcCnt float64 wf_size int64 wf_type object date object fairshare float64 wrapped bool response_time float64 norm_response_time float64 norm_proc_cnt float64 norm_cpus_in_use float64 norm_cpus_requested float64 exp_general object dtype: object
To compare the performance of different models when tuning hyperparameters and to evaluate the final models, we will use the Mean Absolute Percentage Error (MAPE). We won't use the widely used Mean Squared Error because the values to be predicted (norm_response_time) are all very close to 1 and have very small differences, so MSE can be misleading when it returns low values (since it squares the difference, which will most likely be below 1 in most cases of this project).
def rndForest(X_train, y_train, X_test, y_test):
# We use a simple GridSearch to find the best hyperparameters for the RandomForestRegressor
# Define the model
model = RandomForestRegressor(random_state=42)
# Set up the parameter grid
param_grid = {
"n_estimators": [10, 50, 100, 150, 200],
"max_depth": [None, 3, 5, 8, 10],
"min_samples_split": [2, 5, 7,10],
"min_samples_leaf": [1, 2, 4],
"max_features": ["sqrt", "log2", None]
}
# Set up cross-validation with GridSearch
grid_search = GridSearchCV(estimator=model, param_grid=param_grid,
cv=5, scoring="neg_mean_absolute_percentage_error",
n_jobs=-1, verbose=1)
# Fit grid search
grid_search.fit(X_train, y_train)
# Get the best parameters and the best model
best_params = grid_search.best_params_
best_model = grid_search.best_estimator_
print(f"Best parameters: {best_params}")
# Evaluate model
y_pred = best_model.predict(X_test)
print(f"Mean Absolute Percentage Error: {mean_absolute_percentage_error(y_test, y_pred)}")
# Get feature importances
importances = best_model.feature_importances_
feature_importance_df = pd.DataFrame({
"Feature": X_train.columns,
"Importance": importances
}).sort_values(by="Importance", ascending=False)
print(feature_importance_df)
return feature_importance_df
Before training the models, we will drop all variables that we are not going to use.
# Drop columns that are not needed (identifiers), columns directly related with response time and columns directly correlated (dependent) with others.
X = joined_dataframe.drop(columns=["response_time", "exp", "date", "exp_general", "time", "execution_time", "wf_type", "Priority",
"wait_time", "SubmitTime", "StartTime", "EndTime", "ProcCnt", "norm_proc_cnt", "cpus_in_use", "cpus_requested"])
X.head()
backfill_pct | wf_size | fairshare | wrapped | norm_response_time | norm_cpus_in_use | norm_cpus_requested | |
---|---|---|---|---|---|---|---|
0 | 100.000000 | 14 | 0.1 | False | 1.028889 | 0.972394 | 2.553841 |
1 | 100.000000 | 14 | 0.1 | True | 1.638889 | 0.992970 | 2.512517 |
2 | 0.000000 | 14 | 0.5 | False | 1.041667 | 0.990912 | 2.509259 |
3 | 0.000000 | 14 | 0.5 | True | 1.000000 | 0.983368 | 2.499143 |
4 | 92.857143 | 14 | 1.0 | False | 1.072778 | 0.998457 | 2.465535 |
We will also create a color dictionary for all variables used in the prediction so that the plot has the same colors.
# Create a color dictionary for the feature importance plot
color_dict = {}
i = 0
for feature in X.columns:
if feature != "norm_response_time":
color_dict[feature] = clr_blind_palette[i]
i += 1
Since Random Forest is a tree-based method, it does not require any kind of scaling or normalization of the data, so we will do that in the next subsection.
sizes_list = [2, 8, 14, 20]
importance_dataframe = pd.DataFrame(columns=["Feature", "Importance", "wf_size"])
# We compute the importance analysis sepparating the data by workflow size
for size in sizes_list:
X_size = X[X["wf_size"]==size]
y_size = X_size["norm_response_time"]
# After separating the data by workflow size, we drop the columns that are not needed (target and wf_size)
X_size = X_size.drop(columns=["norm_response_time", "wf_size"])
X_train, X_test, y_train, y_test = train_test_split(X_size, y_size, test_size=0.2, random_state=42)
ft_importance =rndForest(X_train, y_train, X_test, y_test)
ft_importance["wf_size"] = size
# Check if importance_dataframe is empty to assign or add the given size data
if importance_dataframe.empty:
# Directly assign ft_importance to importance_dataframe if it"s empty
importance_dataframe = ft_importance
else:
# Otherwise, concatenate as usual
importance_dataframe = pd.concat([importance_dataframe, ft_importance])
Fitting 5 folds for each of 900 candidates, totalling 4500 fits Best parameters: {'max_depth': None, 'max_features': 'sqrt', 'min_samples_leaf': 2, 'min_samples_split': 10, 'n_estimators': 10} Mean Absolute Percentage Error: 0.04712720107834652 Feature Importance 0 backfill_pct 0.421799 3 norm_cpus_in_use 0.303839 4 norm_cpus_requested 0.191932 2 wrapped 0.051684 1 fairshare 0.030746 Fitting 5 folds for each of 900 candidates, totalling 4500 fits Best parameters: {'max_depth': 5, 'max_features': 'sqrt', 'min_samples_leaf': 2, 'min_samples_split': 2, 'n_estimators': 10} Mean Absolute Percentage Error: 0.0997016013199306 Feature Importance 4 norm_cpus_requested 0.438319 0 backfill_pct 0.222346 3 norm_cpus_in_use 0.216728 1 fairshare 0.080399 2 wrapped 0.042207 Fitting 5 folds for each of 900 candidates, totalling 4500 fits Best parameters: {'max_depth': None, 'max_features': 'sqrt', 'min_samples_leaf': 4, 'min_samples_split': 2, 'n_estimators': 50} Mean Absolute Percentage Error: 0.16797855417420385 Feature Importance 4 norm_cpus_requested 0.416721 3 norm_cpus_in_use 0.286494 0 backfill_pct 0.164266 1 fairshare 0.098466 2 wrapped 0.034053 Fitting 5 folds for each of 900 candidates, totalling 4500 fits Best parameters: {'max_depth': 8, 'max_features': 'sqrt', 'min_samples_leaf': 1, 'min_samples_split': 2, 'n_estimators': 10} Mean Absolute Percentage Error: 0.2673037700832103 Feature Importance 3 norm_cpus_in_use 0.479541 4 norm_cpus_requested 0.227781 0 backfill_pct 0.165965 1 fairshare 0.098200 2 wrapped 0.028513
# We change wf_size to a categorical variable for plotting purposes
importance_dataframe["wf_size"] = importance_dataframe["wf_size"].astype("category")
# We plot the feature importance evolution for each workflow size as a dot plot with dots connected by lines
plt.figure(figsize=(12, 6))
sns.lineplot(data=importance_dataframe, x="wf_size", y="Importance", hue="Feature", marker="o", palette=color_dict)
# Customizing the plot
plt.title("Feature Importance by Workflow Size for Random Forest Regressor")
plt.ylabel("Importance")
plt.xlabel("Workflow Size")
# Add gridlines to the y-axis only
plt.grid(axis="y")
# Ensure the x-axis only has ticks for values present in the data
plt.xticks(importance_dataframe["wf_size"].unique())
# Position the legend at the center right, outside the plot
plt.legend(loc="center left", bbox_to_anchor=(1, 0.5))
# Show the plot
plt.show()
As we can see, fair share and wrap condition seem to be the less important features. However, it is interesting to note that fair share becomes more important as the workflow gets larger, because as backfill becomes less important (since backfilling large jobs is much more difficult), priority (which is quite dependent on fair share) and machine utilization become the determinants of queue time. This coincides with the fact that backfill is more important for the smaller workflows, since it is easier to add a job through backfill when it is smaller, and since all added jobs are of the same size, the smaller the workflow, the easier it is to go through backfill (backfill scheduling allows other jobs to use the reserved job slots, as long as the other jobs do not delay the start of another job).
In summary, the three most important variables are backfill and the variables related with the machine status at the time of submission. It also makes sense that for the biggest workflows CPUs in use take the most importance and fair share hardly increases in importance(if the job with the highest fair share, and most likely priority, needs more resources than the ones available at that time, it will have no choice but to wait in queue).
4.2 Linear Regression¶
Before we apply the feature scaling, we will take a look at the current distributions.
# Plot distribution of all continuous features
X.hist(figsize=(16, 12))
plt.tight_layout()
For linear models to work better, continuous variables should follow as closely as possible a normal distribution. To try to improve this condition, since only normalized CPUs requested seems to follow something fairly similar, we will apply some transformations to the base features. We will do this individually for each size group so that the different training and test sets don't suffer from data leakage when applying such transformations. Data leakage occurs when information from outside the training data set is used to build the model. This additional information can allow the model to learn or know something that it would not otherwise know, and in turn invalidate the estimated performance of the mode being constructed.
Additionally, although fair share could be treated as a category because it only has 3 values (in our case study), we will leave it as a continuous variable since in reality it is a continuous variable and this way we can preserve the order realtionship inherent to the data (i.e. fair share 1 > fair share 0.5, and not simply two different groups or categories).
continuous_features = ["backfill_pct", "norm_cpus_in_use", "norm_cpus_requested", "fairshare"]
Since we have just a few samples of each size, we will train a linear model with Ridge Regression, where the loss function is the linear least squares function and regularization is given by the l2-norm, to try prevent overfitting as much as possible. In addition, after applying cross-validation to decide which regularization value to use between 0.0001 and 1 (10-4 and 10^0), since we want to prevent overfitting without promoting underfitting with a too large alpha value.
This time we will use a 5-fold (20% test, 80% training) to do 5 different trainings and then average the absolute values of the coefficients from each training.
from sklearn.linear_model import Ridge
from sklearn.model_selection import KFold
def linearRegression(X_train, y_train, X_test, y_test):
# Define a range of alpha values to search over
alpha_range = {'alpha': np.logspace(-4, 0, 1000)}
# Initialize the Ridge model
ridge = Ridge()
# Perform grid search with cross-validation to find the best alpha
grid_search = GridSearchCV(estimator=ridge, param_grid=alpha_range, cv=5, scoring=['r2', 'neg_mean_absolute_percentage_error'], refit='neg_mean_absolute_percentage_error', n_jobs=-1, verbose=1)
grid_search.fit(X_train, y_train)
# The best alpha value found using cross validation
best_alpha = grid_search.best_params_['alpha']
print(f"Best alpha: {best_alpha}")
# Use cross validatio to fit 10 models with different training and validation sets and the best alpha, keeping the coefficients of each model
coefficients = []
for train_index, val_index in KFold(n_splits=5).split(X_train):
X_train_fold, X_val_fold = X_train.iloc[train_index], X_train.iloc[val_index]
y_train_fold, y_val_fold = y_train.iloc[train_index], y_train.iloc[val_index]
# Fit the model
ridge = Ridge(alpha=best_alpha)
ridge.fit(X_train_fold, y_train_fold)
# Print the mean squared error for each fold
y_pred = ridge.predict(X_val_fold)
print(f"Mean Absolute Percentage Error: {mean_absolute_percentage_error(y_val_fold, y_pred)}")
# Store the coefficients
coefficients.append(ridge.coef_)
# Calculate the mean of the coefficients
coefficients = np.mean(coefficients, axis=0)
# Normalize the coefficients so they sum to 1
coefficients = coefficients / np.sum(coefficients)
# Get feature coefficients
feature_coefficient_df = pd.DataFrame({
"Feature": X_train.columns,
"Coefficient": coefficients
}).sort_values(by="Coefficient", ascending=False)
print
print(feature_coefficient_df)
return feature_coefficient_df
from sklearn.preprocessing import StandardScaler
sizes_list = [2, 8, 14, 20]
importance_dataframe = pd.DataFrame(columns=["Feature", "Coefficient", "wf_size"])
# We compute the importance analysis separating the data by workflow size
for size in sizes_list:
X_size = X[X["wf_size"] == size]
y_size = X_size["norm_response_time"]
# After separating the data by workflow size, we drop the columns that are not needed (target and wf_size)
X_size = X_size.drop(columns=["norm_response_time", "wf_size"])
X_train, X_test, y_train, y_test = train_test_split(X_size, y_size, test_size=0.2, random_state=42)
# Apply transformations to normalize the continuous features (as explained, we leave fairshare as continuous)
scaler = StandardScaler()
X_train[continuous_features] = scaler.fit_transform(X_train[continuous_features])
X_test[continuous_features] = scaler.transform(X_test[continuous_features])
ft_importance = linearRegression(X_train, y_train, X_test, y_test)
ft_importance["wf_size"] = size
# Check if importance_dataframe is empty to assign or add the given size data
if importance_dataframe.empty:
importance_dataframe = ft_importance
else:
importance_dataframe = pd.concat([importance_dataframe, ft_importance])
# We change wf_size to a categorical variable for plotting purposes
importance_dataframe["wf_size"] = importance_dataframe["wf_size"].astype("category")
# We make a copy before normalizing the coefficients and taking the absolute value
imp_df = importance_dataframe.copy()
# Do the absolute value of the coefficients
importance_dataframe["Coefficient"] = np.abs(importance_dataframe["Coefficient"])
# For each size, normalize the coefficients so that they sum to 1
importance_dataframe["Coefficient"] = importance_dataframe.groupby("wf_size")["Coefficient"].transform(lambda x: x / x.sum())
# We plot the feature importance evolution for each workflow size as a dot plot with dots connected by lines
plt.figure(figsize=(12, 6))
sns.lineplot(data=importance_dataframe, x="wf_size", y="Coefficient", hue="Feature", marker="o", palette=color_dict)
# Customizing the plot
plt.title("Feature Coefficients by Workflow Size for Ridge Regression")
plt.ylabel("Coefficient")
plt.xlabel("Workflow Size")
# Add gridlines to the y-axis only
plt.grid(axis="y")
# Ensure the x-axis only has ticks for values present in the data
plt.xticks(importance_dataframe["wf_size"].unique())
# Position the legend at the center right, outside the plot
plt.legend(loc="center left", bbox_to_anchor=(1, 0.5))
# Show the plot
plt.show()
Fitting 5 folds for each of 1000 candidates, totalling 5000 fits Best alpha: 1.0 Mean Absolute Percentage Error: 0.05545705401218481 Mean Absolute Percentage Error: 0.05908475890109969 Mean Absolute Percentage Error: 0.07057214069960328 Mean Absolute Percentage Error: 0.06684286180750026 Mean Absolute Percentage Error: 0.057359118496660685 Feature Coefficient 0 backfill_pct 0.743418 4 norm_cpus_requested 0.450050 2 wrapped 0.126139 1 fairshare -0.117360 3 norm_cpus_in_use -0.202247 Fitting 5 folds for each of 1000 candidates, totalling 5000 fits Best alpha: 1.0 Mean Absolute Percentage Error: 0.11392159391959884 Mean Absolute Percentage Error: 0.15297316566420796 Mean Absolute Percentage Error: 0.13654938892208054 Mean Absolute Percentage Error: 0.16863355318067763 Mean Absolute Percentage Error: 0.12090820223234798 Feature Coefficient 0 backfill_pct 0.651987 4 norm_cpus_requested 0.215412 2 wrapped 0.070323 1 fairshare 0.067527 3 norm_cpus_in_use -0.005249 Fitting 5 folds for each of 1000 candidates, totalling 5000 fits Best alpha: 1.0 Mean Absolute Percentage Error: 0.16900591446320104 Mean Absolute Percentage Error: 0.26263799765733425 Mean Absolute Percentage Error: 0.11673535720483025 Mean Absolute Percentage Error: 0.2564285773395059 Mean Absolute Percentage Error: 0.1747599131774489 Feature Coefficient 0 backfill_pct 0.354282 2 wrapped 0.303924 1 fairshare 0.197009 4 norm_cpus_requested 0.127829 3 norm_cpus_in_use 0.016956 Fitting 5 folds for each of 1000 candidates, totalling 5000 fits Best alpha: 1.0 Mean Absolute Percentage Error: 0.20743351558945147 Mean Absolute Percentage Error: 0.17365951480731656 Mean Absolute Percentage Error: 0.1478222707858723 Mean Absolute Percentage Error: 0.18756385262534894 Mean Absolute Percentage Error: 0.16311694589775463 Feature Coefficient 0 backfill_pct 3.970562 2 wrapped 0.409240 1 fairshare -0.123102 4 norm_cpus_requested -0.939394 3 norm_cpus_in_use -2.317306
This result is significantly different from the one obtained with the Random Forest method. Although there are some similarities, such as CPU usage gaining a lot of importance for the largest workflows, the most important difference is that backfill maintains a very high importance along all sizes, which could be influenced by the unwrapped workflows in the case of the higher sizes (since these are individual small jobs that could be backfilled). Another clear difference is that although fair share and wrapping condition seem to become more important as the size increases, they decrease again at size 20. A possible explanation for this could be that the sudden increase in the importance of machine utilization causes these two variables to become less important.
The next plot shows the unaltered mean of coefficients for each size.
# Create a figure with a single subplot
fig, ax = plt.subplots(figsize=(16, 6))
# Plot the paired barplot
sns.barplot(data=imp_df, x="wf_size", y="Coefficient", hue="Feature", palette=color_dict, ax=ax)
# Customizing the plot
plt.title("Mean Ridge Regression Coefficients of the Variables to predict norm_response_time, by Workflow Size")
plt.ylabel("Coefficient")
plt.xlabel("Feature")
# Add gridlines to the y-axis
plt.grid(axis="y")
# Position the legend at the center right, outside the plot
plt.legend(title="Workflow Size", loc="center left", bbox_to_anchor=(1, 0.5))
# Show the plot
plt.show()
4.3 Correlation based importance¶
The final analysis will be a simpler one based only on correlation between the normalized response time and the selected feature.
X.head()
backfill_pct | wf_size | fairshare | wrapped | norm_response_time | norm_cpus_in_use | norm_cpus_requested | |
---|---|---|---|---|---|---|---|
0 | 100.000000 | 14 | 0.1 | False | 1.028889 | 0.972394 | 2.553841 |
1 | 100.000000 | 14 | 0.1 | True | 1.638889 | 0.992970 | 2.512517 |
2 | 0.000000 | 14 | 0.5 | False | 1.041667 | 0.990912 | 2.509259 |
3 | 0.000000 | 14 | 0.5 | True | 1.000000 | 0.983368 | 2.499143 |
4 | 92.857143 | 14 | 1.0 | False | 1.072778 | 0.998457 | 2.465535 |
By default, pandas .corr()
method computes Pearson or standard correlation coefficient. Since this method already standardizes the variables, there is no need to normalize the data this time.
from scipy.stats import pointbiserialr
sizes_list = [2, 8, 14, 20]
correlation_dataframe = pd.DataFrame(columns=["Feature", "Coefficient", "wf_size"])
# To be able to compute correlation having a boolean variable, we convert 'wrapped' to numeric (0 and 1)
X['wrapped'] = X['wrapped'].astype(int)
# List of continuous features and the boolean variable
features = ["backfill_pct", "norm_cpus_in_use", "norm_cpus_requested", "fairshare", "wrapped"]
for size in sizes_list:
X_size = X[X["wf_size"] == size]
# Compute the Pearson correlation for all continuous variables including 'wrapped'
correlation_matrix = X_size[features + ["norm_response_time"]].corr()
# For point-biserial correlation between 'wrapped' and 'norm_response_time'
point_biserial_corr, _ = pointbiserialr(X_size['wrapped'], X_size['norm_response_time'])
# Add the point-biserial correlation manually to the matrix
correlation_matrix.loc['wrapped', 'norm_response_time'] = point_biserial_corr
correlation_matrix.loc['norm_response_time', 'wrapped'] = point_biserial_corr
# Get the correlation of 'norm_response_time' with all other features
feature_correlation = correlation_matrix["norm_response_time"].drop("norm_response_time")
# Store the correlation values in a dataframe
feature_correlation_df = pd.DataFrame({
"Feature": feature_correlation.index,
"Coefficient": feature_correlation.values,
"wf_size": size
})
# Check if correlation_dataframe is empty to assign or add the given size data
if correlation_dataframe.empty:
correlation_dataframe = feature_correlation_df
else:
correlation_dataframe = pd.concat([correlation_dataframe, feature_correlation_df])
# We change wf_size to a categorical variable for plotting purposes
correlation_dataframe["wf_size"] = correlation_dataframe["wf_size"].astype("category")
# We make a copy before normalizing the coefficients and taking the absolute value
corr_df = correlation_dataframe.copy()
# Do the absolute value of the coefficients
correlation_dataframe["Coefficient"] = np.abs(correlation_dataframe["Coefficient"])
# For each size, normalize the coefficients so that they sum to 1
correlation_dataframe["Coefficient"] = correlation_dataframe.groupby("wf_size")["Coefficient"].transform(lambda x: x / x.sum())
# We plot the feature importance evolution for each workflow size as a dot plot with dots connected by lines
plt.figure(figsize=(12, 6))
sns.lineplot(data=correlation_dataframe, x="wf_size", y="Coefficient", hue="Feature", marker="o", palette=color_dict)
# Customizing the plot
plt.title("Normalized Absolute Correlation Coefficients of the Variables with norm_response_time by Workflow Size ")
plt.ylabel("Normalized Absolute Coefficient")
plt.xlabel("Workflow Size")
# Add gridlines to the y-axis only
plt.grid(axis="y")
# Ensure the x-axis only has ticks for values present in the data
plt.xticks(correlation_dataframe["wf_size"].unique())
# Position the legend at the center right, outside the plot
plt.legend(loc="center left", bbox_to_anchor=(1, 0.5))
# Show the plot
plt.show()
This last result is quite similar to the previous one. However, although the normalized machine utilization is more important for the largest size than for the others, so is the backfill percentage. This could again be influenced by the unwrapped workflows of size 20, which could benefit greatly from backfilling.
The following plot shows the raw correlation values for the different sizes, in order to see how exactly each variable is correlated with the normalized response time.
# Create a figure with a single subplot
fig, ax = plt.subplots(figsize=(16, 6))
# Plot the paired barplot
sns.barplot(data=corr_df, x="wf_size", y="Coefficient", hue="Feature", palette=color_dict, ax=ax)
# Customizing the plot
plt.title("Correlation Coefficients of the Variables with norm_response_time by Workflow Size")
plt.ylabel("Coefficient")
plt.xlabel("Feature")
# Add gridlines to the y-axis
plt.grid(axis="y")
# Position the legend at the center right, outside the plot
plt.legend(title="Workflow Size", loc="center left", bbox_to_anchor=(1, 0.5))
# Show the plot
plt.show()
We will also sepparate also by vertical and horizontal workflows to see the importance of wrappers for this kind of workflows.
# Sizes to consider
sizes_list = [2, 8, 14, 20]
correlation_dataframe = pd.DataFrame(columns=["Feature", "Coefficient", "wf_size"])
# Convert 'wrapped' to numeric (0 and 1) if not already done
X_filtered['wrapped'] = X_filtered['wrapped'].astype(int)
# List of continuous features and the boolean variable
features = ["backfill_pct", "norm_cpus_in_use", "norm_cpus_requested", "fairshare", "wrapped"]
for size in sizes_list:
X_size = X_filtered[X_filtered["wf_size"] == size]
# Compute the Pearson correlation for all continuous variables including 'wrapped'
correlation_matrix = X_size[features + ["norm_response_time"]].corr()
# For point-biserial correlation between 'wrapped' and 'norm_response_time'
point_biserial_corr, _ = pointbiserialr(X_size['wrapped'], X_size['norm_response_time'])
# Add the point-biserial correlation manually to the matrix
correlation_matrix.loc['wrapped', 'norm_response_time'] = point_biserial_corr
correlation_matrix.loc['norm_response_time', 'wrapped'] = point_biserial_corr
# Get the correlation of 'norm_response_time' with all other features
feature_correlation = correlation_matrix["norm_response_time"].drop("norm_response_time")
# Store the correlation values in a dataframe
feature_correlation_df = pd.DataFrame({
"Feature": feature_correlation.index,
"Coefficient": feature_correlation.values,
"wf_size": size
})
# Check if correlation_dataframe is empty to assign or add the given size data
if correlation_dataframe.empty:
correlation_dataframe = feature_correlation_df
else:
correlation_dataframe = pd.concat([correlation_dataframe, feature_correlation_df])
# We change wf_size to a categorical variable for plotting purposes
correlation_dataframe["wf_size"] = correlation_dataframe["wf_size"].astype("category")
# We make a copy before normalizing the coefficients and taking the absolute value
corr_df = correlation_dataframe.copy()
# Do the absolute value of the coefficients
correlation_dataframe["Coefficient"] = np.abs(correlation_dataframe["Coefficient"])
# For each size, normalize the coefficients so that they sum to 1
correlation_dataframe["Coefficient"] = correlation_dataframe.groupby("wf_size")["Coefficient"].transform(lambda x: x / x.sum())
# We plot the feature importance evolution for each workflow size as a dot plot with dots connected by lines
plt.figure(figsize=(12, 6))
sns.lineplot(data=correlation_dataframe, x="wf_size", y="Coefficient", hue="Feature", marker="o", palette=color_dict)
# Customizing the plot
plt.title("Normalized Absolute Correlation Coefficients of the Variables with norm_response_time by Workflow Size (Filtered by wf_type = 'V')")
plt.ylabel("Normalized Absolute Coefficient")
plt.xlabel("Workflow Size")
# Add gridlines to the y-axis only
plt.grid(axis="y")
# Ensure the x-axis only has ticks for values present in the data
plt.xticks(correlation_dataframe["wf_size"].unique())
# Position the legend at the center right, outside the plot
plt.legend(loc="center left", bbox_to_anchor=(1, 0.5))
# Show the plot
plt.show()
When we do the analysis for vertical workflows only, even though we have half the samples to compute the correlation, we can clearly see the importance of wrapping the workflow as it increases in size. This is consistent with what we have seen throughout the notebook, wrappers have a greater impact the larger the workflow. It is interesting to note that if we do not separate by type, machine usage is the most important feature for the larger workflows, but for vertical workflows it loses all importance and wrapping takes its place.
# Create a figure with a single subplot
fig, ax = plt.subplots(figsize=(16, 6))
# Plot the paired barplot
sns.barplot(data=corr_df, x="wf_size", y="Coefficient", hue="Feature", palette=color_dict, ax=ax)
# Customizing the plot
plt.title("Correlation Coefficients of the Variables with norm_response_time by Workflow Size")
plt.ylabel("Coefficient")
plt.xlabel("Feature")
# Add gridlines to the y-axis
plt.grid(axis="y")
# Position the legend at the center right, outside the plot
plt.legend(title="Workflow Size", loc="center left", bbox_to_anchor=(1, 0.5))
# Show the plot
plt.show()
As expected the correlation with wrapped is negative, since in most cases wrapping (wrapped = True -> value 1) reduces normalized response time for vertical workflows, as we have seen throughout all the notebook.