1 #if HAVE_CONFIG_H
2 # include "config.h"
3 #endif
4
5 extern void free(void *ptr);
6
7 #include "tcgmsgP.h"
8
9 static const long false = 0;
10 static const long true = 1;
11
12 extern void Busy(int);
13
14 extern long async_send(SendQEntry *);
15
16
17 /**
18 * Given a nodeid return a unqiue integer constructed by
19 * combining it with the value of a counter
20 */
NextMsgID(long node)21 static long NextMsgID(long node)
22 {
23 static long id = 0;
24 static long mask = (1<<20)-1;
25
26 id = (id + 1) & mask;
27 if (id == 0) id = 1;
28
29 return (node << 20) + id;
30 }
31
32
33 /**
34 * Given an id from NextMsgID extract the node
35 */
NodeFromMsgID(long msgid)36 static long NodeFromMsgID(long msgid)
37 {
38 long node = msgid >> 20;
39
40 if (node < 0 || node > NNODES_())
41 Error("NodeFromMsgID: invalid msgid", msgid);
42
43 return node;
44 }
45
46
47 /**
48 * Flush as many messages as possible without blocking from
49 * the send q to the specified node.
50 */
flush_send_q_node(long node)51 static void flush_send_q_node(long node)
52 {
53 while (TCGMSG_proc_info[node].sendq) {
54
55 if (!async_send(TCGMSG_proc_info[node].sendq)) {
56 /* Send is incomplete ... stop processing this q*/
57 break;
58 }
59 else {
60 SendQEntry *tmp = TCGMSG_proc_info[node].sendq;
61
62 TCGMSG_proc_info[node].sendq = (SendQEntry *) TCGMSG_proc_info[node].sendq->next;
63 if (tmp->free_buf_on_completion)
64 (void) free(tmp->buf);
65 tmp->active = false; /* Matches NewSendQEntry() */
66 }
67 }
68 }
69
70
71 /**
72 * Flush as many messages as possible without blocking
73 * from all of the send q's.
74 */
flush_send_q()75 void flush_send_q()
76 {
77 long node;
78 long nproc = NNODES_();
79
80 for (node=0; node<nproc; node++)
81 if (TCGMSG_proc_info[node].sendq)
82 flush_send_q_node(node);
83 }
84
85
86 /**
87 * Return 0 if the message operation is incomplete.
88 * Return 1 if the message operation is complete.
89 */
msg_status(long msgid)90 long msg_status(long msgid)
91 {
92 long node = NodeFromMsgID(msgid);
93 SendQEntry *entry;
94 long status = 1;
95
96 flush_send_q();
97
98 /* Attempt to find the msgid in the message q. If it is not
99 there then the send is complete */
100
101 for (entry=TCGMSG_proc_info[node].sendq; entry; entry=(SendQEntry *) entry->next) {
102 if (entry->msgid == msgid) {
103 status = 0;
104 break;
105 }
106 }
107
108 return status;
109 }
110
111
112 /**
113 * Wait for the operation referred to by msgid to complete.
114 */
msg_wait(long msgid)115 void msg_wait(long msgid)
116 {
117 long nspin = 0;
118 long spinlim = 1000000;
119
120 while (!msg_status(msgid)) {
121 nspin++;
122 if (nspin < spinlim)
123 Busy(100);
124 else
125 usleep(1);
126 }
127 }
128
129
NewSendQEntry(void)130 static SendQEntry *NewSendQEntry(void)
131 {
132 SendQEntry *new = TCGMSG_sendq_ring;
133
134 if (new->active)
135 Error("NewSendQEntry: too many outstanding sends\n", 0L);
136
137 TCGMSG_sendq_ring = (SendQEntry *) TCGMSG_sendq_ring->next_in_ring;
138
139 new->active = true;
140
141 return new;
142 }
143
144
msg_async_snd(long type,char * buf,long lenbuf,long node)145 long msg_async_snd(long type, char *buf, long lenbuf, long node)
146 {
147 long msgid;
148 SendQEntry *entry;
149
150 if (node<0 || node>=TCGMSG_nnodes)
151 Error("msg_async_send: node is out of range", node);
152
153 if (node == TCGMSG_nodeid)
154 Error("msg_async_send: cannot send to self", node);
155
156 msgid = NextMsgID(node);
157 entry = NewSendQEntry();
158
159 /* Insert a new entry into the q */
160
161 entry->tag = TCGMSG_proc_info[node].n_snd++; /* Increment tag */
162 entry->msgid = msgid;
163 entry->type = type;
164 entry->buf = buf;
165 entry->free_buf_on_completion = 0;
166 entry->lenbuf= lenbuf;
167 entry->node = node;
168 entry->next = (SendQEntry *) 0;
169 entry->written = 0;
170 entry->buffer_number = 0;
171
172 /* Attach to the send q */
173
174 if (!TCGMSG_proc_info[node].sendq)
175 TCGMSG_proc_info[node].sendq = entry;
176 else {
177 SendQEntry *cur = TCGMSG_proc_info[node].sendq;
178
179 while (cur->next)
180 cur = cur->next;
181 cur->next = entry;
182 }
183
184 /* Attempt to flush the send q */
185
186 flush_send_q();
187
188 return msgid;
189 }
190
191
192 /**
193 * synchronous send of message to a process
194 *
195 * long *type = user defined integer message type (input)
196 * char *buf = data buffer (input)
197 * long *lenbuf = length of buffer in bytes (input)
198 * long *node = node to send to (input)
199 *
200 * for zero length messages only the header is sent
201 */
msg_snd(long type,char * buf,long lenbuf,long node)202 void msg_snd(long type, char *buf, long lenbuf, long node)
203 {
204 msg_wait(msg_async_snd(type, buf, lenbuf, node));
205 }
206