forked from dmlc/mshadow
-
Notifications
You must be signed in to change notification settings - Fork 0
/
dist_async_sum-inl.h
124 lines (109 loc) · 3.53 KB
/
dist_async_sum-inl.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
/**
* @brief Simple test of KVLayer
*/
#include "ps.h"
#include "parameter/kv_layer.h"
#include <cstdio>
#include <iostream>
#include <omp.h>
#include <map>
#include <mshadow/tensor.h>
#include <mshadow-ps/mshadow_ps.h>
#include "dbstr.h"
#include "glog/logging.h"
namespace mshadow {
namespace ps {
template<typename DType>
class Updater : public IModelUpdater<DType> {
protected:
void InitModel_(int key, Tensor<cpu, 1, DType> data) {
data = 0;
data_[key] = data;
}
void Update_(int key, Tensor<cpu, 1, DType> data) {
data_[key] += data;
// LOG(ERROR) << dbstr(data_[key]);
}
std::map<int, Tensor<cpu, 1, DType> > data_;
};
template<typename DType>
IModelUpdater<DType> *CreateModelUpdater(void) {
return new Updater<DType>();
}
} // namespace ps
} // namespace mshadow
// this function is runed by specific thread
template<typename xpu>
inline void RunWorkerThread(int devid,
mshadow::ps::ISharedModel<xpu, float> *ps) {
// initialize tensor engine
mshadow::InitTensorEngine<xpu>(devid);
mshadow::Stream<xpu> *stream = mshadow::NewStream<xpu>();
// allocate tensor on xpu
mshadow::TensorContainer<xpu, 2> data(mshadow::Shape2(2, 3));
// set the computation stream to the new allocated stream
// this will make subsequent computation whose target is data
// to use the stream, stream is needed for async execution in GPU
data.set_stream(stream);
// intiaialize the key, register the shape on parameter server
ps->InitKey(data[0].shape_, 0, devid);
ps->InitKey(data[1].shape_, 1, devid);
// first step, pull the data back from server
ps->PullReq(data[0], 0, devid);
ps->PullReq(data[1], 1, devid);
// PullWait will block until these request finishes
ps->PullWait(0, devid);
ps->PullWait(1, devid);
data[1] = devid + data[0];
LOG(ERROR) << "node " << ::ps::MyNodeID() << ", dev " << devid << ": before sync\n"
<< dbstr(data);
// push data[0] out, for update, or aggregation
// 0 is the key of the data, devid is the current device id
ps->Push(data[0], 0, devid);
// pull request is used to request the data to be copied back
// once computation is done
ps->PullReq(data[0], 0, devid);
// computation can be done here..
// the pull request handler will be overlapped with
// similar as previous call
ps->PullWait(0, devid);
ps->Push(data[1], 1, devid);
ps->PullReq(data[1], 1, devid);
// more computation can be done here...
// the computation will be overlapped
// PullWait will block until these request finishes
ps->PullWait(1, devid);
LOG(ERROR) << "node " << ::ps::MyNodeID() << ", dev " << devid
<< ": after sync\n" << dbstr(data);
mshadow::DeleteStream(stream);
mshadow::ShutdownTensorEngine<xpu>();
}
template<typename xpu>
inline int Run(int argc, char *argv[]) {
if (argc < 2) {
printf("Usage: device list\n"\
"\tfor CPU the device list can be arbitrary\n"\
"\tfor GPU the device list need to be actual device index\n");
return 0;
}
// list of device ids
std::vector<int> devs;
// initialization
for (int i = 1; i < argc; ++i) {
// record the device id
devs.push_back(atoi(argv[i]));
}
mshadow::ps::ISharedModel<xpu, float>
*ps = mshadow::ps::CreateSharedModel<xpu, float>("dist");
// intiaialize the ps
ps->SetParam("update_on_server", "1");
ps->Init(devs);
// use openmp to launch #devs threads
#pragma omp parallel num_threads(devs.size())
{
int tid = omp_get_thread_num();
RunWorkerThread<xpu>(devs[tid], ps);
}
delete ps;
return 0;
}