1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied.  See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 // this implements a simple two layer Multi-GPU neural net
21 // this implementation uses mshadow-ps to get gradient aggregation
22 // between cards
23 // this code is modified from nnet.cu
24 #include <vector>
25 #include <cmath>
26 #include <omp.h>
27 // header file to use mshadow
28 #include <mshadow/tensor.h>
29 #include <mshadow-ps/mshadow_ps.h>
30 // helper function to load mnist dataset
31 #include "./util.h"
32 // this namespace contains all data structures, functions
33 using namespace mshadow;
34 // this namespace contains all operator overloads
35 using namespace mshadow::expr;
36 
37 // define sigmoid operation
38 struct sigmoid {
Mapsigmoid39   MSHADOW_XINLINE static real_t Map(real_t a) {
40     return 1.0f / (1.0f + expf(-a));
41   }
42 };
43 
44 /*! \brief interface for nnet, interfacd allows use to use GPU/CPU implementation in a unified way */
45 class INNet{
46  public:
47   virtual void Forward(const Tensor<cpu, 2, real_t>& inbatch,
48                        Tensor<cpu, 2, real_t> &oubatch) = 0;
49   virtual void Backprop(const Tensor<cpu, 2, real_t>& gradout) = 0;
~INNet()50   virtual ~INNet() {}
51 };
52 
53 /*!
54  * \brief simple two layer neural net
55  *        this implementation is device invariant
56  */
57 template<typename xpu>
58 class NNet : public INNet {
59  public:
60   // initialize the network
NNet(int batch_size,int num_in,int num_hidden,int num_out,int devid,mshadow::ps::ISharedModel<xpu,real_t> * ps)61   NNet(int batch_size, int num_in, int num_hidden, int num_out,
62        int devid, mshadow::ps::ISharedModel<xpu, real_t> *ps)
63       : rnd(0), devid(devid), ps(ps) {
64     mshadow::SetDevice<xpu>(devid);
65     stream = mshadow::NewStream<xpu>();
66     // set the computing streams
67     ninput.set_stream(stream);
68     nhidden.set_stream(stream);
69     nhiddenbak.set_stream(stream);
70     nout.set_stream(stream);
71     hbias.set_stream(stream);
72     obias.set_stream(stream);
73     g_hbias.set_stream(stream);
74     g_obias.set_stream(stream);
75     Wi2h.set_stream(stream);
76     Wh2o.set_stream(stream);
77     g_Wi2h.set_stream(stream);
78     g_Wh2o.set_stream(stream);
79     rnd.set_stream(stream);
80     // setup nodes
81     ninput.Resize(Shape2(batch_size, num_in));
82     nhidden.Resize(Shape2(batch_size, num_hidden));
83     nhiddenbak.Resize(nhidden.shape_);
84     nout.Resize(Shape2(batch_size, num_out));
85     // setup bias
86     hbias.Resize(Shape1(num_hidden)); g_hbias.Resize(hbias.shape_);
87     obias.Resize(Shape1(num_out)); g_obias.Resize(obias.shape_);
88     hbias = 0.0f; obias = 0.0f;
89     // setup weights
90     Wi2h.Resize(Shape2(num_in, num_hidden));  g_Wi2h.Resize(Wi2h.shape_);
91     Wh2o.Resize(Shape2(num_hidden, num_out)); g_Wh2o.Resize(Wh2o.shape_);
92     rnd.SampleGaussian(&Wi2h, 0, 0.01f);
93     rnd.SampleGaussian(&Wh2o, 0, 0.01f);
94     // initialize the key
95     ps->InitKey(Wi2h.shape_, 0, devid);
96     ps->InitKey(hbias.shape_, 1, devid);
97     ps->InitKey(Wh2o.shape_, 2, devid);
98     ps->InitKey(obias.shape_, 3, devid);
99   }
~NNet()100   virtual ~NNet() {
101     mshadow::SetDevice<xpu>(devid);
102     mshadow::DeleteStream(stream);
103   }
104   // forward propagation
Forward(const Tensor<cpu,2,real_t> & inbatch,Tensor<cpu,2,real_t> & oubatch)105   virtual void Forward(const Tensor<cpu, 2, real_t> &inbatch,
106                        Tensor<cpu, 2, real_t> &oubatch) {
107     // size is same conventsion as numpy
108     index_t batch_size = inbatch.size(0);
109     // copy data to input layer
110     Copy(ninput, inbatch, stream);
111     // wait the last pull requst on layer to complete
112     ps->PullWait(0, devid);
113     // first layer, fullc
114     nhidden = dot(ninput, Wi2h);
115     // wait the pull request on hbias to complete
116     ps->PullWait(1, devid);
117     nhidden+= repmat(hbias, batch_size);
118     // activation, sigmloid, backup activation in nhidden
119     nhidden = F<sigmoid>(nhidden);
120     Copy(nhiddenbak, nhidden, stream);
121     // second layer fullc
122     ps->PullWait(2, devid);
123     nout = dot(nhiddenbak, Wh2o);
124     ps->PullWait(3, devid);
125     nout += repmat(obias, batch_size);
126     // softmax calculation
127     Softmax(nout, nout);
128     // copy result out
129     Copy(oubatch, nout, stream);
130     // Copy with stream is non-blocking, use wait to wait until copy finishes
131     stream->Wait();
132   }
133   // back propagation
Backprop(const Tensor<cpu,2,real_t> & gradout)134   virtual void Backprop(const Tensor<cpu, 2, real_t> &gradout) {
135     // copy gradient to output layer
136     Copy(nout, gradout, stream);
137     // calc grad of layer 2
138     g_obias = sum_rows(nout);
139     // sync proc defines the synchronization step
140     this->SyncProc(obias, g_obias, 3);
141     // update second layer weights
142     g_Wh2o = dot(nhiddenbak.T(), nout);
143     // backprop to layer 1
144     nhiddenbak = dot(nout, Wh2o.T());
145     this->SyncProc(Wh2o, g_Wh2o, 2);
146     // calculate gradient of sigmoid layer
147     nhidden = nhidden * (1.0f-nhidden) * nhiddenbak;
148     // calc grad of layer 1
149     g_hbias = sum_rows(nhidden);
150     this->SyncProc(hbias, g_hbias, 1);
151     g_Wi2h = dot(ninput.T(), nhidden);
152     this->SyncProc(Wi2h, g_Wi2h, 0);
153   }
154   // synchronization function
155   template<int dim>
SyncProc(mshadow::Tensor<xpu,dim> weight,mshadow::Tensor<xpu,dim> grad,int data_key)156   inline void SyncProc(mshadow::Tensor<xpu, dim> weight,
157                        mshadow::Tensor<xpu, dim> grad,
158                        int data_key) {
159     // wait till last computation finishes
160     stream->Wait();
161     ps->Push(grad, data_key, devid, -data_key);
162     ps->PullReq(grad, data_key, devid, -data_key,
163                 UpdateEntry::ApplyUpdate,
164                 new UpdateEntry(weight.FlatTo2D(), grad.FlatTo2D(), dim == 1));
165   }
166   // data structure defined to help using callback function
167   struct UpdateEntry {
168     mshadow::Tensor<xpu, 2> weight;
169     mshadow::Tensor<xpu, 2> grad;
170     bool is_bias;
171     // constructor
UpdateEntryNNet::UpdateEntry172     UpdateEntry(mshadow::Tensor<xpu, 2> weight,
173                 mshadow::Tensor<xpu, 2> grad,
174                 bool is_bias)
175         : weight(weight), grad(grad),
176           is_bias(is_bias) {}
UpdateNNet::UpdateEntry177     inline void Update(mshadow::Stream<xpu> *stream) {
178       weight.set_stream(stream);
179       const float wd = 0.00001;
180       const float eta = 0.8;
181       if (!is_bias) {
182         weight -= eta * (wd * weight + grad);
183       } else {
184         weight -= eta * grad;
185       }
186     }
187     // callback function to apply update
ApplyUpdateNNet::UpdateEntry188     inline static void ApplyUpdate(mshadow::Stream<xpu> *stream, void *arg) {
189       UpdateEntry *e = static_cast<UpdateEntry*>(arg);
190       e->Update(stream);
191       delete e;
192     }
193   };
194 
195  private:
196   // computing stream
197   mshadow::Stream<xpu> *stream;
198   // device id
199   int devid;
200   // parameter server interface
201   mshadow::ps::ISharedModel<xpu, real_t> *ps;
202   // random seed generator
203   Random<xpu, real_t> rnd;
204   // nodes in neural net
205   TensorContainer<xpu, 2, real_t> ninput, nhidden, nhiddenbak, nout;
206   // hidden bias, gradient
207   TensorContainer<xpu, 1, real_t> hbias, obias, g_hbias, g_obias;
208   // weight gradient
209   TensorContainer<xpu, 2, real_t> Wi2h, Wh2o, g_Wi2h, g_Wh2o;
210 };
211 
212 // helper function to get the max inde
MaxIndex(Tensor<cpu,1,real_t> pred)213 inline int MaxIndex(Tensor<cpu, 1, real_t> pred) {
214   int maxidx = 0;
215   for(index_t i = 1; i < pred.size(0); ++i) {
216     if(pred[i] > pred[maxidx]) maxidx = (int)i;
217   }
218   return maxidx;
219 }
220 
221 namespace mshadow {
222 namespace ps {
223 // model updater is used when update is happening on server side
224 // if we only use parameter server for sum aggregation
225 // this is not needed, but we must declare this function to return NULL
226 template<>
CreateModelUpdater(void)227 IModelUpdater<float> *CreateModelUpdater(void) {
228   return NULL;
229 }
230 }
231 }
232 
233 template<typename xpu>
Run(int argc,char * argv[])234 inline int Run(int argc, char *argv[]) {
235   srand(0);
236   // settings
237   int batch_size = 100;
238   int num_in = 28 * 28;
239   int num_hidden = 100;
240   int num_out = 10;
241   int ndev = argc - 2;
242   if (batch_size % ndev != 0) {
243     fprintf(stderr, "choose number of devices ndev such that 100 MOD ndev == 0\n");
244     return 0;
245   }
246   // choose which version to use
247   std::vector<int> devs;
248   for (int i = 2; i < argc; ++i) {
249     devs.push_back(atoi(argv[i]));
250   }
251   mshadow::ps::ISharedModel<xpu, real_t>
252       *ps = mshadow::ps::CreateSharedModel<xpu, real_t>("local");
253   ps->Init(devs);
254 
255   std::vector<INNet *> nets(ndev);
256   for (int i = 0; i < ndev; ++i) {
257     mshadow::InitTensorEngine<xpu>(devs[i]);
258     nets[i] = new NNet<xpu>(batch_size / ndev, num_in, num_hidden, num_out, devs[i], ps);
259   }
260 
261   // label
262   std::vector<int> ytrain, ytest;
263   // data
264   TensorContainer<cpu,2> xtrain, xtest;
265   LoadMNIST("train-images-idx3-ubyte", "train-labels-idx1-ubyte", ytrain, xtrain, true);
266   LoadMNIST("t10k-images-idx3-ubyte", "t10k-labels-idx1-ubyte", ytest, xtest, false);
267   int num_iter = 20;
268 
269   for (int i = 0; i < num_iter; ++ i) {
270     // mini-batch per device
271     int step = batch_size / ndev;
272     // running parallel threads
273     #pragma omp parallel num_threads(ndev)
274     {
275       // temp output layer
276       TensorContainer<cpu, 2, real_t> pred;
277       pred.Resize(Shape2(step, num_out));
278       int tid = omp_get_thread_num();
279       mshadow::SetDevice<xpu>(devs[tid]);
280       for (index_t j = 0; j + batch_size <= xtrain.size(0); j += batch_size) {
281         nets[tid]->Forward(xtrain.Slice(j + tid * step, j + (tid + 1) * step), pred);
282         // set gradient into pred
283         for (int k = 0; k < step; ++ k) {
284           pred[k][ytrain[j + tid * step + k]] -= 1.0f;
285         }
286         // scale gradient by batchs zie
287         pred *= 1.0f / batch_size;
288         // run backprop
289         nets[tid]->Backprop(pred);
290       }
291     }
292     // evaluation
293     long nerr = 0;
294     #pragma omp parallel num_threads(ndev) reduction(+:nerr)
295     {
296       // temp output layer
297       TensorContainer<cpu, 2, real_t> pred;
298       pred.Resize(Shape2(step, num_out));
299       int tid = omp_get_thread_num();
300       mshadow::SetDevice<xpu>(devs[tid]);
301       for (index_t j = 0; j + batch_size <= xtest.size(0); j += batch_size) {
302         nets[tid]->Forward(xtest.Slice(j + tid * step, j + (tid + 1) * step), pred);
303         for (int k = 0; k < step; ++ k) {
304           nerr += MaxIndex(pred[k]) != ytest[j + tid * step + k];
305         }
306       }
307     }
308     printf("round %d: test-err=%f\n", i, (float)nerr/xtest.size(0));
309   }
310 
311   for(int i = 0; i < ndev; ++i) {
312     mshadow::SetDevice<xpu>(devs[i]);
313     delete nets[i];
314     ShutdownTensorEngine<xpu>();
315   }
316   return 0;
317 }
main(int argc,char * argv[])318 int main(int argc, char *argv[]) {
319   if (argc < 3) {
320     printf("Usage: <device> devicelist\n"\
321            "\tExample1: ./nnet_ps cpu 1 2 3\n"\
322            "\tExample2: ./nnet_ps gpu 0 1\n");
323     return 0;
324   }
325   if (!strcmp(argv[1], "cpu")) {
326     Run<mshadow::cpu>(argc, argv);
327   } else {
328     Run<mshadow::gpu>(argc, argv);
329   }
330   return 0;
331 }
332