-
Notifications
You must be signed in to change notification settings - Fork 0
/
time_calculation.py
97 lines (85 loc) · 3.22 KB
/
time_calculation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import time
import copy
import math
import argparse
import torch
import csv
def parse_args(parser):
"""
Parse Args
"""
parser.add_argument("--master-ip", required=True, type=str)
parser.add_argument("--num-nodes", required=True, type=int)
parser.add_argument("--rank", required=True, type=int)
parser.add_argument("--vector-size", required=True, type=int)
parser.add_argument("--vector-size-file", required=True, type=str)
args = parser.parse_args()
return args
def main(args):
torch.distributed.init_process_group(
backend="nccl",
init_method=f"tcp://{args.master_ip}:6585",
rank=args.rank,
world_size=args.num_nodes,
)
emb_stats_file = open(args.vector_size_file, "r")
reader = csv.DictReader(emb_stats_file)
# memory_range = [0.5, 1, 2, 4, 8, 16, 32, 64, 128]
# memory_range = [6]
# for m in memory_range:
total_time_p2p = 0
total_time_all_reduce = 0
for row in reader:
number_of_embeddings_to_read = float(row["average"])
# mb * 1024 *1024 / (number of floa
embs_all_reduce = int(row["all_reduce"])
avg_emb_to_read_write = int(number_of_embeddings_to_read / (args.num_nodes - 1))
embedding_send = torch.rand(
(avg_emb_to_read_write, 64), dtype=torch.float32, device="cuda:0"
)
embedding_buffer = [
torch.zeros_like(embedding_send) for _ in range(args.num_nodes - 1)
]
embedding_to_all_reduce = torch.rand(
(embs_all_reduce, 64), dtype=torch.float32, device="cuda:0"
)
start_all_reduce = time.time()
torch.distributed.all_reduce(embedding_to_all_reduce)
torch.cuda.synchronize()
end_all_reduce = time.time()
# print("Time all reduce {}".format(end_all_reduce - start_all_reduce))
total_time_all_reduce += end_all_reduce - start_all_reduce
# sending embeddings
start_time = time.time()
count = 0
futures_list = list()
for i in range(args.num_nodes):
if args.rank != i:
torch.distributed.isend(embedding_send, i)
request_obj = torch.distributed.irecv(embedding_buffer[count], src=i)
futures_list.append(request_obj)
count += 1
for obj in futures_list:
obj.wait()
torch.cuda.synchronize()
count = 0
futures_list = list()
for i in range(args.num_nodes):
if args.rank != i:
torch.distributed.isend(embedding_send, i)
request_obj = torch.distributed.irecv(embedding_buffer[count], src=i)
futures_list.append(request_obj)
count += 1
for obj in futures_list:
obj.wait()
torch.cuda.synchronize()
end_time = time.time()
# print("Send and recieve {}".format(end_time - start_time))
total_time_p2p += end_time - start_time
# print("Emb size {}".format(m))
print("Total all reduce = {}".format(total_time_all_reduce))
print("Total time p2p = {}".format(total_time_p2p))
return None
if __name__ == "__main__":
args = parse_args(argparse.ArgumentParser(description="Arguments for All reduce"))
main(args)