1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 // this implements a simple two layer Multi-GPU neural net
21 // this implementation uses mshadow-ps to get gradient aggregation
22 // between cards
23 // this code is modified from nnet.cu
24 #include <vector>
25 #include <cmath>
26 #include <omp.h>
27 // header file to use mshadow
28 #include <mshadow/tensor.h>
29 #include <mshadow-ps/mshadow_ps.h>
30 // helper function to load mnist dataset
31 #include "./util.h"
32 // this namespace contains all data structures, functions
33 using namespace mshadow;
34 // this namespace contains all operator overloads
35 using namespace mshadow::expr;
36
37 // define sigmoid operation
38 struct sigmoid {
Mapsigmoid39 MSHADOW_XINLINE static real_t Map(real_t a) {
40 return 1.0f / (1.0f + expf(-a));
41 }
42 };
43
44 /*! \brief interface for nnet, interfacd allows use to use GPU/CPU implementation in a unified way */
45 class INNet{
46 public:
47 virtual void Forward(const Tensor<cpu, 2, real_t>& inbatch,
48 Tensor<cpu, 2, real_t> &oubatch) = 0;
49 virtual void Backprop(const Tensor<cpu, 2, real_t>& gradout) = 0;
~INNet()50 virtual ~INNet() {}
51 };
52
53 /*!
54 * \brief simple two layer neural net
55 * this implementation is device invariant
56 */
57 template<typename xpu>
58 class NNet : public INNet {
59 public:
60 // initialize the network
NNet(int batch_size,int num_in,int num_hidden,int num_out,int devid,mshadow::ps::ISharedModel<xpu,real_t> * ps)61 NNet(int batch_size, int num_in, int num_hidden, int num_out,
62 int devid, mshadow::ps::ISharedModel<xpu, real_t> *ps)
63 : rnd(0), devid(devid), ps(ps) {
64 mshadow::SetDevice<xpu>(devid);
65 stream = mshadow::NewStream<xpu>();
66 // set the computing streams
67 ninput.set_stream(stream);
68 nhidden.set_stream(stream);
69 nhiddenbak.set_stream(stream);
70 nout.set_stream(stream);
71 hbias.set_stream(stream);
72 obias.set_stream(stream);
73 g_hbias.set_stream(stream);
74 g_obias.set_stream(stream);
75 Wi2h.set_stream(stream);
76 Wh2o.set_stream(stream);
77 g_Wi2h.set_stream(stream);
78 g_Wh2o.set_stream(stream);
79 rnd.set_stream(stream);
80 // setup nodes
81 ninput.Resize(Shape2(batch_size, num_in));
82 nhidden.Resize(Shape2(batch_size, num_hidden));
83 nhiddenbak.Resize(nhidden.shape_);
84 nout.Resize(Shape2(batch_size, num_out));
85 // setup bias
86 hbias.Resize(Shape1(num_hidden)); g_hbias.Resize(hbias.shape_);
87 obias.Resize(Shape1(num_out)); g_obias.Resize(obias.shape_);
88 hbias = 0.0f; obias = 0.0f;
89 // setup weights
90 Wi2h.Resize(Shape2(num_in, num_hidden)); g_Wi2h.Resize(Wi2h.shape_);
91 Wh2o.Resize(Shape2(num_hidden, num_out)); g_Wh2o.Resize(Wh2o.shape_);
92 rnd.SampleGaussian(&Wi2h, 0, 0.01f);
93 rnd.SampleGaussian(&Wh2o, 0, 0.01f);
94 // initialize the key
95 ps->InitKey(Wi2h.shape_, 0, devid);
96 ps->InitKey(hbias.shape_, 1, devid);
97 ps->InitKey(Wh2o.shape_, 2, devid);
98 ps->InitKey(obias.shape_, 3, devid);
99 }
~NNet()100 virtual ~NNet() {
101 mshadow::SetDevice<xpu>(devid);
102 mshadow::DeleteStream(stream);
103 }
104 // forward propagation
Forward(const Tensor<cpu,2,real_t> & inbatch,Tensor<cpu,2,real_t> & oubatch)105 virtual void Forward(const Tensor<cpu, 2, real_t> &inbatch,
106 Tensor<cpu, 2, real_t> &oubatch) {
107 // size is same conventsion as numpy
108 index_t batch_size = inbatch.size(0);
109 // copy data to input layer
110 Copy(ninput, inbatch, stream);
111 // wait the last pull requst on layer to complete
112 ps->PullWait(0, devid);
113 // first layer, fullc
114 nhidden = dot(ninput, Wi2h);
115 // wait the pull request on hbias to complete
116 ps->PullWait(1, devid);
117 nhidden+= repmat(hbias, batch_size);
118 // activation, sigmloid, backup activation in nhidden
119 nhidden = F<sigmoid>(nhidden);
120 Copy(nhiddenbak, nhidden, stream);
121 // second layer fullc
122 ps->PullWait(2, devid);
123 nout = dot(nhiddenbak, Wh2o);
124 ps->PullWait(3, devid);
125 nout += repmat(obias, batch_size);
126 // softmax calculation
127 Softmax(nout, nout);
128 // copy result out
129 Copy(oubatch, nout, stream);
130 // Copy with stream is non-blocking, use wait to wait until copy finishes
131 stream->Wait();
132 }
133 // back propagation
Backprop(const Tensor<cpu,2,real_t> & gradout)134 virtual void Backprop(const Tensor<cpu, 2, real_t> &gradout) {
135 // copy gradient to output layer
136 Copy(nout, gradout, stream);
137 // calc grad of layer 2
138 g_obias = sum_rows(nout);
139 // sync proc defines the synchronization step
140 this->SyncProc(obias, g_obias, 3);
141 // update second layer weights
142 g_Wh2o = dot(nhiddenbak.T(), nout);
143 // backprop to layer 1
144 nhiddenbak = dot(nout, Wh2o.T());
145 this->SyncProc(Wh2o, g_Wh2o, 2);
146 // calculate gradient of sigmoid layer
147 nhidden = nhidden * (1.0f-nhidden) * nhiddenbak;
148 // calc grad of layer 1
149 g_hbias = sum_rows(nhidden);
150 this->SyncProc(hbias, g_hbias, 1);
151 g_Wi2h = dot(ninput.T(), nhidden);
152 this->SyncProc(Wi2h, g_Wi2h, 0);
153 }
154 // synchronization function
155 template<int dim>
SyncProc(mshadow::Tensor<xpu,dim> weight,mshadow::Tensor<xpu,dim> grad,int data_key)156 inline void SyncProc(mshadow::Tensor<xpu, dim> weight,
157 mshadow::Tensor<xpu, dim> grad,
158 int data_key) {
159 // wait till last computation finishes
160 stream->Wait();
161 ps->Push(grad, data_key, devid, -data_key);
162 ps->PullReq(grad, data_key, devid, -data_key,
163 UpdateEntry::ApplyUpdate,
164 new UpdateEntry(weight.FlatTo2D(), grad.FlatTo2D(), dim == 1));
165 }
166 // data structure defined to help using callback function
167 struct UpdateEntry {
168 mshadow::Tensor<xpu, 2> weight;
169 mshadow::Tensor<xpu, 2> grad;
170 bool is_bias;
171 // constructor
UpdateEntryNNet::UpdateEntry172 UpdateEntry(mshadow::Tensor<xpu, 2> weight,
173 mshadow::Tensor<xpu, 2> grad,
174 bool is_bias)
175 : weight(weight), grad(grad),
176 is_bias(is_bias) {}
UpdateNNet::UpdateEntry177 inline void Update(mshadow::Stream<xpu> *stream) {
178 weight.set_stream(stream);
179 const float wd = 0.00001;
180 const float eta = 0.8;
181 if (!is_bias) {
182 weight -= eta * (wd * weight + grad);
183 } else {
184 weight -= eta * grad;
185 }
186 }
187 // callback function to apply update
ApplyUpdateNNet::UpdateEntry188 inline static void ApplyUpdate(mshadow::Stream<xpu> *stream, void *arg) {
189 UpdateEntry *e = static_cast<UpdateEntry*>(arg);
190 e->Update(stream);
191 delete e;
192 }
193 };
194
195 private:
196 // computing stream
197 mshadow::Stream<xpu> *stream;
198 // device id
199 int devid;
200 // parameter server interface
201 mshadow::ps::ISharedModel<xpu, real_t> *ps;
202 // random seed generator
203 Random<xpu, real_t> rnd;
204 // nodes in neural net
205 TensorContainer<xpu, 2, real_t> ninput, nhidden, nhiddenbak, nout;
206 // hidden bias, gradient
207 TensorContainer<xpu, 1, real_t> hbias, obias, g_hbias, g_obias;
208 // weight gradient
209 TensorContainer<xpu, 2, real_t> Wi2h, Wh2o, g_Wi2h, g_Wh2o;
210 };
211
212 // helper function to get the max inde
MaxIndex(Tensor<cpu,1,real_t> pred)213 inline int MaxIndex(Tensor<cpu, 1, real_t> pred) {
214 int maxidx = 0;
215 for(index_t i = 1; i < pred.size(0); ++i) {
216 if(pred[i] > pred[maxidx]) maxidx = (int)i;
217 }
218 return maxidx;
219 }
220
221 namespace mshadow {
222 namespace ps {
223 // model updater is used when update is happening on server side
224 // if we only use parameter server for sum aggregation
225 // this is not needed, but we must declare this function to return NULL
226 template<>
CreateModelUpdater(void)227 IModelUpdater<float> *CreateModelUpdater(void) {
228 return NULL;
229 }
230 }
231 }
232
233 template<typename xpu>
Run(int argc,char * argv[])234 inline int Run(int argc, char *argv[]) {
235 srand(0);
236 // settings
237 int batch_size = 100;
238 int num_in = 28 * 28;
239 int num_hidden = 100;
240 int num_out = 10;
241 int ndev = argc - 2;
242 if (batch_size % ndev != 0) {
243 fprintf(stderr, "choose number of devices ndev such that 100 MOD ndev == 0\n");
244 return 0;
245 }
246 // choose which version to use
247 std::vector<int> devs;
248 for (int i = 2; i < argc; ++i) {
249 devs.push_back(atoi(argv[i]));
250 }
251 mshadow::ps::ISharedModel<xpu, real_t>
252 *ps = mshadow::ps::CreateSharedModel<xpu, real_t>("local");
253 ps->Init(devs);
254
255 std::vector<INNet *> nets(ndev);
256 for (int i = 0; i < ndev; ++i) {
257 mshadow::InitTensorEngine<xpu>(devs[i]);
258 nets[i] = new NNet<xpu>(batch_size / ndev, num_in, num_hidden, num_out, devs[i], ps);
259 }
260
261 // label
262 std::vector<int> ytrain, ytest;
263 // data
264 TensorContainer<cpu,2> xtrain, xtest;
265 LoadMNIST("train-images-idx3-ubyte", "train-labels-idx1-ubyte", ytrain, xtrain, true);
266 LoadMNIST("t10k-images-idx3-ubyte", "t10k-labels-idx1-ubyte", ytest, xtest, false);
267 int num_iter = 20;
268
269 for (int i = 0; i < num_iter; ++ i) {
270 // mini-batch per device
271 int step = batch_size / ndev;
272 // running parallel threads
273 #pragma omp parallel num_threads(ndev)
274 {
275 // temp output layer
276 TensorContainer<cpu, 2, real_t> pred;
277 pred.Resize(Shape2(step, num_out));
278 int tid = omp_get_thread_num();
279 mshadow::SetDevice<xpu>(devs[tid]);
280 for (index_t j = 0; j + batch_size <= xtrain.size(0); j += batch_size) {
281 nets[tid]->Forward(xtrain.Slice(j + tid * step, j + (tid + 1) * step), pred);
282 // set gradient into pred
283 for (int k = 0; k < step; ++ k) {
284 pred[k][ytrain[j + tid * step + k]] -= 1.0f;
285 }
286 // scale gradient by batchs zie
287 pred *= 1.0f / batch_size;
288 // run backprop
289 nets[tid]->Backprop(pred);
290 }
291 }
292 // evaluation
293 long nerr = 0;
294 #pragma omp parallel num_threads(ndev) reduction(+:nerr)
295 {
296 // temp output layer
297 TensorContainer<cpu, 2, real_t> pred;
298 pred.Resize(Shape2(step, num_out));
299 int tid = omp_get_thread_num();
300 mshadow::SetDevice<xpu>(devs[tid]);
301 for (index_t j = 0; j + batch_size <= xtest.size(0); j += batch_size) {
302 nets[tid]->Forward(xtest.Slice(j + tid * step, j + (tid + 1) * step), pred);
303 for (int k = 0; k < step; ++ k) {
304 nerr += MaxIndex(pred[k]) != ytest[j + tid * step + k];
305 }
306 }
307 }
308 printf("round %d: test-err=%f\n", i, (float)nerr/xtest.size(0));
309 }
310
311 for(int i = 0; i < ndev; ++i) {
312 mshadow::SetDevice<xpu>(devs[i]);
313 delete nets[i];
314 ShutdownTensorEngine<xpu>();
315 }
316 return 0;
317 }
main(int argc,char * argv[])318 int main(int argc, char *argv[]) {
319 if (argc < 3) {
320 printf("Usage: <device> devicelist\n"\
321 "\tExample1: ./nnet_ps cpu 1 2 3\n"\
322 "\tExample2: ./nnet_ps gpu 0 1\n");
323 return 0;
324 }
325 if (!strcmp(argv[1], "cpu")) {
326 Run<mshadow::cpu>(argc, argv);
327 } else {
328 Run<mshadow::gpu>(argc, argv);
329 }
330 return 0;
331 }
332