1 //
2 // messshm.cc
3 //
4 // Copyright (C) 1996 Limit Point Systems, Inc.
5 //
6 // Author: Curtis Janssen <cljanss@limitpt.com>
7 // Maintainer: LPS
8 //
9 // This file is part of the SC Toolkit.
10 //
11 // The SC Toolkit is free software; you can redistribute it and/or modify
12 // it under the terms of the GNU Library General Public License as published by
13 // the Free Software Foundation; either version 2, or (at your option)
14 // any later version.
15 //
16 // The SC Toolkit is distributed in the hope that it will be useful,
17 // but WITHOUT ANY WARRANTY; without even the implied warranty of
18 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 // GNU Library General Public License for more details.
20 //
21 // You should have received a copy of the GNU Library General Public License
22 // along with the SC Toolkit; see the file COPYING.LIB.  If not, write to
23 // the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 //
25 // The U.S. Government is granted a limited license as per AL 91-7.
26 //
27 
28 
29 #include <unistd.h>
30 #include <sys/types.h>
31 #include <sys/ipc.h>
32 #include <sys/sem.h>
33 #include <sys/shm.h>
34 
35 
36 #include <util/misc/bug.h>
37 #include <util/misc/formio.h>
38 #include <util/group/messshm.h>
39 
40 using namespace std;
41 using namespace sc;
42 
43 //#define DEBUG
44 
45 #ifndef SEM_A
46 #  define SEM_A 0200
47 #endif
48 
49 #ifndef SEM_R
50 #  define SEM_R 0400
51 #endif
52 
53 /* NALIGN is the byte boundary that we align data on. */
54 #define NALIGN 8
55 #define ROUNDUPTOALIGN(n) (((n) + (NALIGN-1)) & ~(NALIGN-1))
56 
57 static ClassDesc ShmMessageGrp_cd(
58   typeid(ShmMessageGrp),"ShmMessageGrp",1,"public intMessageGrp",
59   0, create<ShmMessageGrp>, 0);
60 
ShmMessageGrp()61 ShmMessageGrp::ShmMessageGrp()
62 {
63   initialize();
64 }
65 
ShmMessageGrp(int nprocs)66 ShmMessageGrp::ShmMessageGrp(int nprocs)
67 {
68   initialize(nprocs);
69 }
70 
ShmMessageGrp(const Ref<KeyVal> & keyval)71 ShmMessageGrp::ShmMessageGrp(const Ref<KeyVal>& keyval):
72   intMessageGrp(keyval)
73 {
74   int nprocs = keyval->intvalue("n");
75   if (keyval->error() != KeyVal::OK) initialize();
76   else initialize(nprocs);
77 }
78 
sync()79 void ShmMessageGrp::sync()
80 {
81   int i;
82   for (i=0; i<n(); i++) {
83       if (me() == i) continue;
84       wait_for_write(i);
85       commbuf[i]->n_sync++;
86       if (commbuf[i]->n_sync >= n()-1) {
87           while(commbuf[i]->n_wait_for_change) {
88               put_change(i);
89               commbuf[i]->n_wait_for_change--;
90             }
91         }
92       release_write(i);
93     }
94   wait_for_write(me());
95   while (commbuf[me()]->n_sync < n()-1) {
96       commbuf[me()]->n_wait_for_change++;
97       release_write(me());
98       get_change(me());
99       wait_for_write(me());
100     }
101   commbuf[me()]->n_sync -= n()-1;
102   while(commbuf[me()]->n_wait_for_change) {
103       put_change(me());
104       commbuf[me()]->n_wait_for_change--;
105     }
106   release_write(me());
107 }
108 
~ShmMessageGrp()109 ShmMessageGrp::~ShmMessageGrp()
110 {
111   // sync the nodes
112   sync();
113 
114   // make sure node zero is las to touch the shared memory
115   if (me() == 0) {
116       wait_for_write(0);
117       while (commbuf[0]->n_sync < n()-1) {
118           commbuf[0]->n_wait_for_change++;
119           release_write(0);
120           get_change(0);
121           wait_for_write(0);
122         }
123       release_write(0);
124       shmdt((SHMTYPE)sharedmem);
125       // release the memory
126       shmctl(shmid,IPC_RMID,0);
127 
128       for (int i=0; i<n(); i++) {
129 #ifdef SEMCTL_REQUIRES_SEMUN
130           semun junk;
131           junk.val = 0;
132 #else
133           int junk = 0;
134 #endif
135           semctl(semid,i,IPC_RMID,junk);
136           semctl(change_semid,i,IPC_RMID,junk);
137         }
138     }
139   else {
140       wait_for_write(0);
141       commbuf[0]->n_sync++;
142       while(commbuf[0]->n_wait_for_change) {
143           put_change(0);
144           commbuf[0]->n_wait_for_change--;
145         }
146       shmdt((SHMTYPE)sharedmem);
147       release_write(0);
148     }
149 }
150 
initialize()151 void ShmMessageGrp::initialize()
152 {
153   int nprocs = atoi(getenv("NUMPROC"));
154   if (!nprocs) nprocs = 1;
155   initialize(nprocs);
156 }
157 
initialize(int nprocs)158 void ShmMessageGrp::initialize(int nprocs)
159 {
160   int i;
161   void* nextbuf;
162 
163   semdec.sem_num = 0;
164   semdec.sem_op = -1;
165   semdec.sem_flg = 0;
166   seminc.sem_num = 0;
167   seminc.sem_op =  1;
168   seminc.sem_flg = 0;
169 
170   // Each node gets a buffer for incoming data.
171   shmid = shmget(IPC_PRIVATE,
172                  nprocs * sizeof(commbuf_t),
173                  IPC_CREAT | SHM_R | SHM_W);
174 
175   // Attach the shared segment.
176   nextbuf = sharedmem = shmat(shmid,0,0);
177 
178 #ifdef SEMCTL_REQUIRES_SEMUN
179   semun semzero;
180   semzero.val = 0;
181   semun semone;
182   semone.val = 1;
183 #else
184   int semzero = 0;
185   int semone = 1;
186 #endif
187 
188   // Each shared memory segment gets a semaphore to synchronize access.
189   semid = semget(IPC_PRIVATE,nprocs,IPC_CREAT | SEM_R | SEM_A );
190   if (semid == -1) {
191       perror("semget");
192       exit(-1);
193     }
194 
195   change_semid = semget(IPC_PRIVATE,nprocs,IPC_CREAT | SEM_R | SEM_A );
196   if (change_semid == -1) {
197       perror("semget");
198       exit(-1);
199     }
200 
201   for (i=0; i<nprocs; i++) {
202 
203       // Mark all of the segments as available for writing.
204       if (semctl(semid,i,SETVAL,semone) == -1) {
205           perror("semctl");
206           exit(-1);
207         }
208 
209       if (semctl(change_semid,i,SETVAL,semzero) == -1) {
210           perror("semctl");
211           exit(-1);
212         }
213 
214       // Alloc shm for each node's commbuf.
215       commbuf[i] = (commbuf_t*) nextbuf;
216       // Mark the commbuf as having no messages.
217       commbuf[i]->nmsg = 0;
218       // and no outstanding waits
219       commbuf[i]->n_wait_for_change = 0;
220       commbuf[i]->n_sync = 0;
221       nextbuf = (void*)(((char*)nextbuf) + sizeof(commbuf_t));
222     }
223 
224   // Create the remaining nodes.
225   int mynodeid = 0;
226   for (i=1; i<nprocs; i++) {
227       int pid = fork();
228       if (!pid) {
229           mynodeid = i;
230           break;
231         }
232     }
233 
234   // Initialize the base class.
235   intMessageGrp::initialize(mynodeid, nprocs, 30);
236 }
237 
238 Ref<MessageGrp>
clone(void)239 ShmMessageGrp::clone(void)
240 {
241   Ref<MessageGrp> smgrp = new ShmMessageGrp(n());
242   return smgrp;
243 }
244 
basic_probe(int msgtype)245 int ShmMessageGrp::basic_probe(int msgtype)
246 {
247   int i;
248   msgbuf_t *message;
249 
250   wait_for_write(me());
251 
252   message = (msgbuf_t*)commbuf[me()]->buf;
253   for (i=0; i<commbuf[me()]->nmsg; i++) {
254       if ((msgtype == -1) || (msgtype == message->type)) {
255           release_write(me());
256           return 1;
257         }
258       message = NEXT_MESSAGE(message);
259     }
260 
261   release_write(me());
262 
263   return 0;
264 }
265 
basic_recv(int type,void * buf,int bytes)266 void ShmMessageGrp::basic_recv(int type, void* buf, int bytes)
267 {
268   int size;
269   int i;
270   msgbuf_t *message,*lastmessage;
271 
272 #ifdef DEBUG
273   ExEnv::outn() << "ShmGrp: basic_recv: "
274        << "type = " << type << ' '
275        << "buf = " << buf << ' '
276        << "bytes = " << bytes << ' '
277        << "me = " << me() << endl;
278   print_buffer(me(),me());
279 #endif
280 
281   look_for_message:
282 
283   wait_for_write(me());
284 
285   message = (msgbuf_t*)commbuf[me()]->buf;
286   for (i=0; i<commbuf[me()]->nmsg; i++) {
287       if (message->type == type) break;
288       message = NEXT_MESSAGE(message);
289     }
290   if (i==commbuf[me()]->nmsg) {
291       commbuf[me()]->n_wait_for_change++;
292       release_write(me());
293       get_change(me());
294       goto look_for_message;
295     }
296 
297   if (bytes < message->size) {
298       ExEnv::errn() << scprintf("messshm.cc: recv buffer too small\n");
299       abort();
300     }
301   if (bytes < message->size) size = bytes;
302   else size = message->size;
303 
304   // Copy the data.
305   memcpy(buf,((char*)message) + sizeof(msgbuf_t),size);
306 
307   // Find the lastmessage.
308   lastmessage = (msgbuf_t*)commbuf[me()]->buf;
309   for (i=0; i<commbuf[me()]->nmsg; i++) {
310       lastmessage = NEXT_MESSAGE(lastmessage);
311     }
312 
313   // Repack the message buffer.
314   memmove(message,NEXT_MESSAGE(message),
315           (size_t)((char*)lastmessage)-(size_t)((char*)NEXT_MESSAGE(message)));
316 
317   commbuf[me()]->nmsg--;
318 
319   while(commbuf[me()]->n_wait_for_change) {
320       put_change(me());
321       commbuf[me()]->n_wait_for_change--;
322     }
323 
324   release_write(me());
325 }
326 
basic_send(int dest,int type,const void * buf,int bytes)327 void ShmMessageGrp::basic_send(int dest, int type, const void* buf, int bytes)
328 {
329   int i;
330   msgbuf_t *availmsg;
331 
332 #ifdef DEBUG
333   ExEnv::outn() << "ShmGrp: basic_send: "
334        << "dest = " << dest << ' '
335        << "type = " << type << ' '
336        << "buf = " << buf << ' '
337        << "bytes = " << bytes << ' '
338        << "me = " << me() << endl;
339 #endif
340 
341   if (dest>=n()) {
342       //debug_start("bad destination");
343       ExEnv::errn() << scprintf("ShmMessageGrp::basic_send: bad destination\n");
344       abort();
345     }
346 
347   try_send_again:
348 
349   // Obtain write access to the dest's incoming buffer.
350   wait_for_write(dest);
351 
352   availmsg = (msgbuf_t*)commbuf[dest]->buf;
353   for (i=0; i<commbuf[dest]->nmsg; i++) {
354       availmsg = NEXT_MESSAGE(availmsg);
355     }
356   if (  (((char*)availmsg) + ROUNDUPTOALIGN(sizeof(msgbuf_t) + bytes))
357         > (((char*)commbuf[dest]) + sizeof(commbuf_t))) {
358       if (me() == dest) {
359           // sending a message to myself and the buffer is full
360           // --cannot recover
361           ExEnv::errn() << scprintf("commbuf size exceeded on %d\n",me());
362           ExEnv::errn() << scprintf(" availmsg = 0x%x\n",availmsg);
363           ExEnv::errn() << scprintf(" commbuf[%d] + sizeof(commbuf_t) = 0x%x\n",
364                            dest,((char*)commbuf[dest]) + sizeof(commbuf_t));
365           ExEnv::errn() << scprintf(" size = %d\n",bytes);
366           abort();
367         }
368       else {
369           // try to recover from a full buffer by waiting for the dest
370           // to read some data.
371           commbuf[dest]->n_wait_for_change++;
372           release_write(dest);
373           get_change(dest);
374           goto try_send_again;
375         }
376     }
377   availmsg->from = me();
378   availmsg->type = type;
379   availmsg->size = bytes;
380   memcpy(((char*)availmsg) + sizeof(msgbuf_t),buf,bytes);
381   commbuf[dest]->nmsg++;
382 
383   // let the dest know that there is more data in the buffer
384   while(commbuf[dest]->n_wait_for_change) {
385       put_change(dest);
386       commbuf[dest]->n_wait_for_change--;
387     }
388 
389   // Release write access to the dest's buffer.
390   release_write(dest);
391 }
392 
NEXT_MESSAGE(msgbuf_t * m)393 msgbuf_t * ShmMessageGrp::NEXT_MESSAGE(msgbuf_t *m)
394 {
395   msgbuf_t *r;
396   if (m->size < 0) {
397       ExEnv::errn() << scprintf("NEXT_MESSAGE: m->size = %d (real %d)\n",
398                        m->size,sizeof(msgbuf_t) + m->size + m->size%8);
399       //debug_start("m->size < 0");
400       ExEnv::errn() << scprintf("messshm.cc: m->size < 0\n");
401       abort();
402     }
403   r = ((msgbuf_t*)(((char*)m) + ROUNDUPTOALIGN(sizeof(msgbuf_t) + m->size)));
404   return r;
405 }
406 
get_change(int node)407 void ShmMessageGrp::get_change(int node)
408 {
409   semdec.sem_num = node;
410   semop(change_semid,&semdec,1);
411   semdec.sem_num = 0;
412 }
413 
put_change(int node)414 void ShmMessageGrp::put_change(int node)
415 {
416   seminc.sem_num = node;
417   semop(change_semid,&seminc,1);
418   seminc.sem_num = 0;
419 
420 }
421 
422 // Obtain a lock for writing to the node's buffer.
wait_for_write(int node)423 void ShmMessageGrp::wait_for_write(int node)
424 {
425   semdec.sem_num = node;
426   semop(semid,&semdec,1);
427   semdec.sem_num = 0;
428 }
429 
430 // Release lock for writing to node's buffer.
release_write(int node)431 void ShmMessageGrp::release_write(int node)
432 {
433   seminc.sem_num = node;
434   semop(semid,&seminc,1);
435   seminc.sem_num = 0;
436 }
437 
438 #ifdef DEBUG
print_buffer(int node,int me)439 void ShmMessageGrp::print_buffer(int node, int me)
440 {
441   int i;
442   msgbuf_t *message;
443   message = (msgbuf_t*)commbuf[node]->buf;
444 
445   ExEnv::outn() << scprintf("Printing buffer for node %d on node %d\n",node,me);
446   for (i=0; i<commbuf[node]->nmsg; i++) {
447       ExEnv::outn() <<
448           scprintf(" on node %2d: to=%2d, bytes=%6d, type=%10d, from=%2d\n",
449                    me,
450                    node,
451                    message->size,
452                    message->type,
453                    message->from);
454       ExEnv::outn().flush();
455       message = NEXT_MESSAGE(message);
456     }
457 
458 }
459 #endif
460 
461 /////////////////////////////////////////////////////////////////////////////
462 
463 // Local Variables:
464 // mode: c++
465 // c-file-style: "CLJ"
466 // End:
467