-
Notifications
You must be signed in to change notification settings - Fork 10
/
las_scheduler.py
168 lines (145 loc) · 5.71 KB
/
las_scheduler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
import os
import warnings
import sys
import copy
import argparse
warnings.simplefilter(action="ignore", category=FutureWarning)
# from job_admission_policy import accept_all
import schedulers
from placement import placement
import admission_control
# from acceptance_policy import load_based_accept
from blox import ClusterState, JobState, BloxManager
import blox.utils as utils
def parse_args(parser):
"""
parser : argparse.ArgumentParser
return a parser with arguments
"""
parser.add_argument(
"--scheduler", default="Las", type=str, help="Name of the scheduling strategy"
)
parser.add_argument(
"--node-manager-port", default=50052, type=int, help="Node Manager RPC port"
)
parser.add_argument(
"--central-scheduler-port",
default=50051,
type=int,
help="Central Scheduler RPC Port",
)
parser.add_argument(
"--simulator-rpc-port",
default=50050,
type=int,
help="Simulator RPC port to fetch ",
)
parser.add_argument(
"--scheduler-name",
default="Las",
type=str,
help="Name of the scheduling strategy",
)
parser.add_argument(
"--placement-name",
default="Las",
type=str,
help="Name of the scheduling strategy",
)
parser.add_argument(
"--acceptance-policy",
default="accept_all",
type=str,
help="Name of acceptance policy",
)
parser.add_argument(
"--plot", action="store_true", default=False, help="Plot metrics"
)
parser.add_argument(
"--exp-prefix", type=str, help="Unique name for prefix over log files"
)
parser.add_argument("--load", type=int, help="Number of jobs per hour")
parser.add_argument("--simulate", action="store_true", help="Enable Simulation")
parser.add_argument(
"--round-duration", type=int, default=300, help="Round duration in seconds"
)
parser.add_argument(
"--start-id-track", type=int, default=3000, help="Starting ID to track"
)
parser.add_argument(
"--stop-id-track", type=int, default=4000, help="Stop ID to track"
)
args = parser.parse_args()
return args
def main(args):
# The scheduler runs consolidated placement, with las scheduler and accept all placement policy
placement_policy = placement.JobPlacement(args)
# Running LAS scheduling policy
scheduling_policy = schedulers.Las(args)
admission_policy = admission_control.acceptAll(args)
if args.simulate:
# for simulation we get the config from the simulator
# The config helps in providing file names and intialize
blox_instance = BloxManager(args)
new_config = blox_instance.rmserver.get_new_sim_config()
print(f"New config {new_config}")
if args.scheduler_name == "":
# terminate the blox instance before exiting
# if no scheduler provided break
blox_instance.terminate_server()
print("No Config Sent")
sys.exit()
blox_instance.scheduler_name = new_config["scheduler"]
blox_instance.load = new_config["load"]
args.scheduler_name = new_config["scheduler"]
args.load = new_config["load"]
args.start_id_track = new_config["start_id_track"]
args.stop_id_track = new_config["stop_id_track"]
print(
f"Running Scheduler {args.scheduler_name}\nLoad {args.load} \n Placement Policy {args.placement_name} \nAcceptance Policy {args.acceptance_policy} \nTracking jobs from {args.start_id_track} to {args.stop_id_track}"
)
blox_instance.reset(args)
cluster_state = ClusterState(args)
job_state = JobState(args)
os.environ["sched_policy"] = args.scheduler_name
os.environ["sched_load"] = str(args.load)
simulator_time = 0
while True:
# get new nodes for the cluster
if blox_instance.terminate:
blox_instance.terminate_server()
print("Terminate current config {}".format(args))
break
blox_instance.update_cluster(cluster_state)
blox_instance.update_metrics(cluster_state, job_state)
new_jobs = blox_instance.pop_wait_queue(args.simulate)
# get simulator jobs
accepted_jobs = admission_policy.accept(new_jobs, cluster_state, job_state)
job_state.add_new_jobs(accepted_jobs)
new_job_schedule = scheduling_policy.schedule(job_state, cluster_state)
# prune jobs - get rid of finished jobs
utils.prune_jobs(job_state, cluster_state, blox_instance)
# perform scheduling
new_job_schedule = scheduling_policy.schedule(job_state, cluster_state)
# get placement
to_suspend, to_launch = placement_policy.place(
job_state, cluster_state, new_job_schedule
)
utils.collect_custom_metrics(
job_state, cluster_state, {"num_preemptions": len(to_suspend)}
)
utils.collect_cluster_job_metrics(job_state, cluster_state)
# check if we have finished every job to track
utils.track_finished_jobs(job_state, cluster_state, blox_instance)
# execute jobs
blox_instance.exec_jobs(to_launch, to_suspend, cluster_state, job_state)
# update time
simulator_time += args.round_duration
job_state.time += args.round_duration
cluster_state.time += args.round_duration
blox_instance.time += args.round_duration
if __name__ == "__main__":
args = parse_args(
argparse.ArgumentParser(description="Arguments for Starting the scheduler")
)
main(args)