1 //
2 // messshm.cc
3 //
4 // Copyright (C) 1996 Limit Point Systems, Inc.
5 //
6 // Author: Curtis Janssen <cljanss@limitpt.com>
7 // Maintainer: LPS
8 //
9 // This file is part of the SC Toolkit.
10 //
11 // The SC Toolkit is free software; you can redistribute it and/or modify
12 // it under the terms of the GNU Library General Public License as published by
13 // the Free Software Foundation; either version 2, or (at your option)
14 // any later version.
15 //
16 // The SC Toolkit is distributed in the hope that it will be useful,
17 // but WITHOUT ANY WARRANTY; without even the implied warranty of
18 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 // GNU Library General Public License for more details.
20 //
21 // You should have received a copy of the GNU Library General Public License
22 // along with the SC Toolkit; see the file COPYING.LIB. If not, write to
23 // the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 //
25 // The U.S. Government is granted a limited license as per AL 91-7.
26 //
27
28
29 #include <unistd.h>
30 #include <sys/types.h>
31 #include <sys/ipc.h>
32 #include <sys/sem.h>
33 #include <sys/shm.h>
34
35
36 #include <util/misc/bug.h>
37 #include <util/misc/formio.h>
38 #include <util/group/messshm.h>
39
40 using namespace std;
41 using namespace sc;
42
43 //#define DEBUG
44
45 #ifndef SEM_A
46 # define SEM_A 0200
47 #endif
48
49 #ifndef SEM_R
50 # define SEM_R 0400
51 #endif
52
53 /* NALIGN is the byte boundary that we align data on. */
54 #define NALIGN 8
55 #define ROUNDUPTOALIGN(n) (((n) + (NALIGN-1)) & ~(NALIGN-1))
56
57 static ClassDesc ShmMessageGrp_cd(
58 typeid(ShmMessageGrp),"ShmMessageGrp",1,"public intMessageGrp",
59 0, create<ShmMessageGrp>, 0);
60
ShmMessageGrp()61 ShmMessageGrp::ShmMessageGrp()
62 {
63 initialize();
64 }
65
ShmMessageGrp(int nprocs)66 ShmMessageGrp::ShmMessageGrp(int nprocs)
67 {
68 initialize(nprocs);
69 }
70
ShmMessageGrp(const Ref<KeyVal> & keyval)71 ShmMessageGrp::ShmMessageGrp(const Ref<KeyVal>& keyval):
72 intMessageGrp(keyval)
73 {
74 int nprocs = keyval->intvalue("n");
75 if (keyval->error() != KeyVal::OK) initialize();
76 else initialize(nprocs);
77 }
78
sync()79 void ShmMessageGrp::sync()
80 {
81 int i;
82 for (i=0; i<n(); i++) {
83 if (me() == i) continue;
84 wait_for_write(i);
85 commbuf[i]->n_sync++;
86 if (commbuf[i]->n_sync >= n()-1) {
87 while(commbuf[i]->n_wait_for_change) {
88 put_change(i);
89 commbuf[i]->n_wait_for_change--;
90 }
91 }
92 release_write(i);
93 }
94 wait_for_write(me());
95 while (commbuf[me()]->n_sync < n()-1) {
96 commbuf[me()]->n_wait_for_change++;
97 release_write(me());
98 get_change(me());
99 wait_for_write(me());
100 }
101 commbuf[me()]->n_sync -= n()-1;
102 while(commbuf[me()]->n_wait_for_change) {
103 put_change(me());
104 commbuf[me()]->n_wait_for_change--;
105 }
106 release_write(me());
107 }
108
~ShmMessageGrp()109 ShmMessageGrp::~ShmMessageGrp()
110 {
111 // sync the nodes
112 sync();
113
114 // make sure node zero is las to touch the shared memory
115 if (me() == 0) {
116 wait_for_write(0);
117 while (commbuf[0]->n_sync < n()-1) {
118 commbuf[0]->n_wait_for_change++;
119 release_write(0);
120 get_change(0);
121 wait_for_write(0);
122 }
123 release_write(0);
124 shmdt((SHMTYPE)sharedmem);
125 // release the memory
126 shmctl(shmid,IPC_RMID,0);
127
128 for (int i=0; i<n(); i++) {
129 #ifdef SEMCTL_REQUIRES_SEMUN
130 semun junk;
131 junk.val = 0;
132 #else
133 int junk = 0;
134 #endif
135 semctl(semid,i,IPC_RMID,junk);
136 semctl(change_semid,i,IPC_RMID,junk);
137 }
138 }
139 else {
140 wait_for_write(0);
141 commbuf[0]->n_sync++;
142 while(commbuf[0]->n_wait_for_change) {
143 put_change(0);
144 commbuf[0]->n_wait_for_change--;
145 }
146 shmdt((SHMTYPE)sharedmem);
147 release_write(0);
148 }
149 }
150
initialize()151 void ShmMessageGrp::initialize()
152 {
153 int nprocs = atoi(getenv("NUMPROC"));
154 if (!nprocs) nprocs = 1;
155 initialize(nprocs);
156 }
157
initialize(int nprocs)158 void ShmMessageGrp::initialize(int nprocs)
159 {
160 int i;
161 void* nextbuf;
162
163 semdec.sem_num = 0;
164 semdec.sem_op = -1;
165 semdec.sem_flg = 0;
166 seminc.sem_num = 0;
167 seminc.sem_op = 1;
168 seminc.sem_flg = 0;
169
170 // Each node gets a buffer for incoming data.
171 shmid = shmget(IPC_PRIVATE,
172 nprocs * sizeof(commbuf_t),
173 IPC_CREAT | SHM_R | SHM_W);
174
175 // Attach the shared segment.
176 nextbuf = sharedmem = shmat(shmid,0,0);
177
178 #ifdef SEMCTL_REQUIRES_SEMUN
179 semun semzero;
180 semzero.val = 0;
181 semun semone;
182 semone.val = 1;
183 #else
184 int semzero = 0;
185 int semone = 1;
186 #endif
187
188 // Each shared memory segment gets a semaphore to synchronize access.
189 semid = semget(IPC_PRIVATE,nprocs,IPC_CREAT | SEM_R | SEM_A );
190 if (semid == -1) {
191 perror("semget");
192 exit(-1);
193 }
194
195 change_semid = semget(IPC_PRIVATE,nprocs,IPC_CREAT | SEM_R | SEM_A );
196 if (change_semid == -1) {
197 perror("semget");
198 exit(-1);
199 }
200
201 for (i=0; i<nprocs; i++) {
202
203 // Mark all of the segments as available for writing.
204 if (semctl(semid,i,SETVAL,semone) == -1) {
205 perror("semctl");
206 exit(-1);
207 }
208
209 if (semctl(change_semid,i,SETVAL,semzero) == -1) {
210 perror("semctl");
211 exit(-1);
212 }
213
214 // Alloc shm for each node's commbuf.
215 commbuf[i] = (commbuf_t*) nextbuf;
216 // Mark the commbuf as having no messages.
217 commbuf[i]->nmsg = 0;
218 // and no outstanding waits
219 commbuf[i]->n_wait_for_change = 0;
220 commbuf[i]->n_sync = 0;
221 nextbuf = (void*)(((char*)nextbuf) + sizeof(commbuf_t));
222 }
223
224 // Create the remaining nodes.
225 int mynodeid = 0;
226 for (i=1; i<nprocs; i++) {
227 int pid = fork();
228 if (!pid) {
229 mynodeid = i;
230 break;
231 }
232 }
233
234 // Initialize the base class.
235 intMessageGrp::initialize(mynodeid, nprocs, 30);
236 }
237
238 Ref<MessageGrp>
clone(void)239 ShmMessageGrp::clone(void)
240 {
241 Ref<MessageGrp> smgrp = new ShmMessageGrp(n());
242 return smgrp;
243 }
244
basic_probe(int msgtype)245 int ShmMessageGrp::basic_probe(int msgtype)
246 {
247 int i;
248 msgbuf_t *message;
249
250 wait_for_write(me());
251
252 message = (msgbuf_t*)commbuf[me()]->buf;
253 for (i=0; i<commbuf[me()]->nmsg; i++) {
254 if ((msgtype == -1) || (msgtype == message->type)) {
255 release_write(me());
256 return 1;
257 }
258 message = NEXT_MESSAGE(message);
259 }
260
261 release_write(me());
262
263 return 0;
264 }
265
basic_recv(int type,void * buf,int bytes)266 void ShmMessageGrp::basic_recv(int type, void* buf, int bytes)
267 {
268 int size;
269 int i;
270 msgbuf_t *message,*lastmessage;
271
272 #ifdef DEBUG
273 ExEnv::outn() << "ShmGrp: basic_recv: "
274 << "type = " << type << ' '
275 << "buf = " << buf << ' '
276 << "bytes = " << bytes << ' '
277 << "me = " << me() << endl;
278 print_buffer(me(),me());
279 #endif
280
281 look_for_message:
282
283 wait_for_write(me());
284
285 message = (msgbuf_t*)commbuf[me()]->buf;
286 for (i=0; i<commbuf[me()]->nmsg; i++) {
287 if (message->type == type) break;
288 message = NEXT_MESSAGE(message);
289 }
290 if (i==commbuf[me()]->nmsg) {
291 commbuf[me()]->n_wait_for_change++;
292 release_write(me());
293 get_change(me());
294 goto look_for_message;
295 }
296
297 if (bytes < message->size) {
298 ExEnv::errn() << scprintf("messshm.cc: recv buffer too small\n");
299 abort();
300 }
301 if (bytes < message->size) size = bytes;
302 else size = message->size;
303
304 // Copy the data.
305 memcpy(buf,((char*)message) + sizeof(msgbuf_t),size);
306
307 // Find the lastmessage.
308 lastmessage = (msgbuf_t*)commbuf[me()]->buf;
309 for (i=0; i<commbuf[me()]->nmsg; i++) {
310 lastmessage = NEXT_MESSAGE(lastmessage);
311 }
312
313 // Repack the message buffer.
314 memmove(message,NEXT_MESSAGE(message),
315 (size_t)((char*)lastmessage)-(size_t)((char*)NEXT_MESSAGE(message)));
316
317 commbuf[me()]->nmsg--;
318
319 while(commbuf[me()]->n_wait_for_change) {
320 put_change(me());
321 commbuf[me()]->n_wait_for_change--;
322 }
323
324 release_write(me());
325 }
326
basic_send(int dest,int type,const void * buf,int bytes)327 void ShmMessageGrp::basic_send(int dest, int type, const void* buf, int bytes)
328 {
329 int i;
330 msgbuf_t *availmsg;
331
332 #ifdef DEBUG
333 ExEnv::outn() << "ShmGrp: basic_send: "
334 << "dest = " << dest << ' '
335 << "type = " << type << ' '
336 << "buf = " << buf << ' '
337 << "bytes = " << bytes << ' '
338 << "me = " << me() << endl;
339 #endif
340
341 if (dest>=n()) {
342 //debug_start("bad destination");
343 ExEnv::errn() << scprintf("ShmMessageGrp::basic_send: bad destination\n");
344 abort();
345 }
346
347 try_send_again:
348
349 // Obtain write access to the dest's incoming buffer.
350 wait_for_write(dest);
351
352 availmsg = (msgbuf_t*)commbuf[dest]->buf;
353 for (i=0; i<commbuf[dest]->nmsg; i++) {
354 availmsg = NEXT_MESSAGE(availmsg);
355 }
356 if ( (((char*)availmsg) + ROUNDUPTOALIGN(sizeof(msgbuf_t) + bytes))
357 > (((char*)commbuf[dest]) + sizeof(commbuf_t))) {
358 if (me() == dest) {
359 // sending a message to myself and the buffer is full
360 // --cannot recover
361 ExEnv::errn() << scprintf("commbuf size exceeded on %d\n",me());
362 ExEnv::errn() << scprintf(" availmsg = 0x%x\n",availmsg);
363 ExEnv::errn() << scprintf(" commbuf[%d] + sizeof(commbuf_t) = 0x%x\n",
364 dest,((char*)commbuf[dest]) + sizeof(commbuf_t));
365 ExEnv::errn() << scprintf(" size = %d\n",bytes);
366 abort();
367 }
368 else {
369 // try to recover from a full buffer by waiting for the dest
370 // to read some data.
371 commbuf[dest]->n_wait_for_change++;
372 release_write(dest);
373 get_change(dest);
374 goto try_send_again;
375 }
376 }
377 availmsg->from = me();
378 availmsg->type = type;
379 availmsg->size = bytes;
380 memcpy(((char*)availmsg) + sizeof(msgbuf_t),buf,bytes);
381 commbuf[dest]->nmsg++;
382
383 // let the dest know that there is more data in the buffer
384 while(commbuf[dest]->n_wait_for_change) {
385 put_change(dest);
386 commbuf[dest]->n_wait_for_change--;
387 }
388
389 // Release write access to the dest's buffer.
390 release_write(dest);
391 }
392
NEXT_MESSAGE(msgbuf_t * m)393 msgbuf_t * ShmMessageGrp::NEXT_MESSAGE(msgbuf_t *m)
394 {
395 msgbuf_t *r;
396 if (m->size < 0) {
397 ExEnv::errn() << scprintf("NEXT_MESSAGE: m->size = %d (real %d)\n",
398 m->size,sizeof(msgbuf_t) + m->size + m->size%8);
399 //debug_start("m->size < 0");
400 ExEnv::errn() << scprintf("messshm.cc: m->size < 0\n");
401 abort();
402 }
403 r = ((msgbuf_t*)(((char*)m) + ROUNDUPTOALIGN(sizeof(msgbuf_t) + m->size)));
404 return r;
405 }
406
get_change(int node)407 void ShmMessageGrp::get_change(int node)
408 {
409 semdec.sem_num = node;
410 semop(change_semid,&semdec,1);
411 semdec.sem_num = 0;
412 }
413
put_change(int node)414 void ShmMessageGrp::put_change(int node)
415 {
416 seminc.sem_num = node;
417 semop(change_semid,&seminc,1);
418 seminc.sem_num = 0;
419
420 }
421
422 // Obtain a lock for writing to the node's buffer.
wait_for_write(int node)423 void ShmMessageGrp::wait_for_write(int node)
424 {
425 semdec.sem_num = node;
426 semop(semid,&semdec,1);
427 semdec.sem_num = 0;
428 }
429
430 // Release lock for writing to node's buffer.
release_write(int node)431 void ShmMessageGrp::release_write(int node)
432 {
433 seminc.sem_num = node;
434 semop(semid,&seminc,1);
435 seminc.sem_num = 0;
436 }
437
438 #ifdef DEBUG
print_buffer(int node,int me)439 void ShmMessageGrp::print_buffer(int node, int me)
440 {
441 int i;
442 msgbuf_t *message;
443 message = (msgbuf_t*)commbuf[node]->buf;
444
445 ExEnv::outn() << scprintf("Printing buffer for node %d on node %d\n",node,me);
446 for (i=0; i<commbuf[node]->nmsg; i++) {
447 ExEnv::outn() <<
448 scprintf(" on node %2d: to=%2d, bytes=%6d, type=%10d, from=%2d\n",
449 me,
450 node,
451 message->size,
452 message->type,
453 message->from);
454 ExEnv::outn().flush();
455 message = NEXT_MESSAGE(message);
456 }
457
458 }
459 #endif
460
461 /////////////////////////////////////////////////////////////////////////////
462
463 // Local Variables:
464 // mode: c++
465 // c-file-style: "CLJ"
466 // End:
467