forked from msr-fiddle/blox
-
Notifications
You must be signed in to change notification settings - Fork 0
/
blox_new_flow_multi_run.py
233 lines (193 loc) · 7.72 KB
/
blox_new_flow_multi_run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
import warnings
warnings.simplefilter(action="ignore", category=FutureWarning)
import os
import argparse
import schedulers
from placement import *
import admission_control
from blox import BloxManager
from blox import ClusterState
from blox import JobState
import blox.utils as utils
import logging
# Define logging configurations
def setup_logging():
log_file = f'test_log_file.log'
logging.basicConfig(filename=log_file, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def main(args):
admission_policy = admission_control.acceptAll(args)
placement_policy = placement.JobPlacement(args)
scheduling_policy = schedulers.Las(args)
# blox_mgr = BloxManager(args)
if args.simulate:
while True:
# need this instatiated because it holds connection to grpc
blox_mgr = BloxManager(args)
new_config = blox_mgr.rmserver.get_new_sim_config()
if args.scheduler_name == "":
# when the blox manager sends no name we terminate
blox_mgr.terminate_server()
break
if new_config["scheduler"] == "":
break
args.scheduler_name = new_config["scheduler"]
args.load = new_config["load"]
args.placement_name = new_config["placement_policy"]
args.acceptance_policy = new_config["acceptance_policy"]
args.start_id_track = new_config["start_id_track"]
args.stop_id_track = new_config["stop_id_track"]
# updating newly recieved config
blox_mgr.reset(args)
cluster_state = ClusterState(args)
job_state = JobState(args)
# choosing which policy to run
if args.placement_name == "Place":
placement_policy = placement.JobPlacement(args)
elif args.placement_name == "PMFirst":
placement_policy = PMFirstPlacement(args)
elif args.placement_name == "PAL":
placement_policy = PALPlacement(args)
elif args.placement_name == "Default-Packed-NS":
placement_policy = PackedNSPlacement(args)
elif args.placement_name == "Default-Packed-S":
placement_policy = PackedSPlacement(args)
elif args.placement_name == "Default-Random-NS":
placement_policy = RandomNSPlacement()
elif args.placement_name == "Default-Random-S":
placement_policy = RandomSPlacement()
else:
raise NotImplemented(
f"Placement Policy {args.placement_policy} not Implemented"
)
if args.acceptance_policy == "AcceptAll":
admission_policy = admission_control.acceptAll(args)
elif args.acceptance_policy == "LoadBasedAccept-1.2x":
admission_policy = admission_control.loadBasedAccept(args, 1.2)
elif args.acceptance_policy == "LoadBasedAccept-1.4x":
admission_policy = admission_control.loadBasedAccept(args, 1.4)
else:
raise NotImplemented(f"{args.acceptance_policy} not Implemented")
if args.scheduler_name == "Las":
scheduling_policy = schedulers.Las(args)
elif args.scheduler_name == "Fifo":
scheduling_policy = schedulers.Fifo(args)
elif args.scheduler_name == "Srtf":
scheduling_policy = schedulers.Srtf(args)
elif args.scheduler_name == "Optimus":
scheduling_policy = schedulers.Optimus(args)
elif args.scheduler_name == "Tiresias":
scheduling_policy = schedulers.Tiresias(args)
else:
raise NotImplemented(f"{args.scheduler_name} not Implemented")
simulator_time = 0
os.environ["sched_policy"] = args.scheduler_name
os.environ["sched_load"] = str(args.load)
while True:
if blox_mgr.terminate:
# print JCT and Avg
blox_mgr.terminate_server()
print("Terminate current config {}".format(args))
break
blox_mgr.update_cluster(cluster_state)
# TODO: Clean update metrics
blox_mgr.update_metrics(cluster_state, job_state)
new_jobs = blox_mgr.pop_wait_queue(args.simulate)
accepted_jobs = admission_policy.accept(
new_jobs,
cluster_state,
job_state,
)
print("GPU Df {}".format(cluster_state.gpu_df))
cluster_state.gpu_df.to_csv("gpu_df.csv")
job_state.add_new_jobs(accepted_jobs)
new_job_schedule = scheduling_policy.schedule(job_state, cluster_state)
to_suspend, to_launch = placement_policy.place(
job_state, cluster_state, new_job_schedule
)
# collecting custom metrics
utils.collect_custom_metrics(
job_state, cluster_state, {"num_preemptions": len(to_suspend)}
)
blox_mgr.exec_jobs(to_launch, to_suspend, cluster_state, job_state)
if args.simulate:
simulator_time += args.round_duration
blox_mgr.time += args.round_duration
job_state.time += args.round_duration
# admission_policy.time += args.round_duration
def parse_args(parser):
"""
parser : argparse.ArgumentParser
return a parser with arguments
"""
parser.add_argument(
"--scheduler", default="Fifo", type=str, help="Name of the scheduling strategy"
)
parser.add_argument(
"--scheduler-name",
default="Fifo",
type=str,
help="Name of the scheduling strategy",
)
parser.add_argument(
"--node-manager-port", default=50052, type=int, help="Node Manager RPC port"
)
parser.add_argument(
"--central-scheduler-port",
default=50051,
type=int,
help="Central Scheduler RPC Port",
)
parser.add_argument(
"--simulator-rpc-port",
default=50050,
type=int,
help="Simulator RPC port to fetch ",
)
parser.add_argument(
"--acceptance-policy",
default="accept_all",
type=str,
help="Name of acceptance policy",
)
parser.add_argument(
"--placement-name",
default="Las",
type=str,
help="Name of the scheduling strategy",
)
parser.add_argument(
"--plot", action="store_true", default=False, help="Plot metrics"
)
parser.add_argument(
"--exp-prefix", type=str, help="Unique name for prefix over log files"
)
parser.add_argument("--load", type=int, help="Number of jobs per hour")
parser.add_argument("--simulate", action="store_true", help="Enable Simulation")
parser.add_argument(
"--round-duration", type=int, default=300, help="Round duration in seconds"
)
parser.add_argument(
"--start-id-track", type=int, default=3000, help="Starting ID to track"
)
parser.add_argument(
"--stop-id-track", type=int, default=4000, help="Stop ID to track"
)
args = parser.parse_args()
return args
def _get_avg_jct(time_dict):
"""
Fetch the avg jct from the dict
"""
values = list(time_dict.values())
count = 0
jct_time = 0
for v in values:
jct_time += v[1] - v[0]
count += 1
return jct_time / count
if __name__ == "__main__":
setup_logging()
args = parse_args(
argparse.ArgumentParser(description="Arguments for Starting the scheduler")
)
main(args)