1 #if HAVE_CONFIG_H
2 #   include "config.h"
3 #endif
4 
5 extern void free(void *ptr);
6 
7 #include "tcgmsgP.h"
8 
9 static const long false = 0;
10 static const long true  = 1;
11 
12 extern void Busy(int);
13 
14 extern long async_send(SendQEntry *);
15 
16 
17 /**
18  * Given a nodeid return a unqiue integer constructed by
19  * combining it with the value of a counter
20  */
NextMsgID(long node)21 static long NextMsgID(long node)
22 {
23     static long id = 0;
24     static long mask = (1<<20)-1;
25 
26     id = (id + 1) & mask;
27     if (id == 0) id = 1;
28 
29     return (node << 20) + id;
30 }
31 
32 
33 /**
34  * Given an id from NextMsgID extract the node
35  */
NodeFromMsgID(long msgid)36 static long NodeFromMsgID(long msgid)
37 {
38     long node = msgid >> 20;
39 
40     if (node < 0 || node > NNODES_())
41         Error("NodeFromMsgID: invalid msgid", msgid);
42 
43     return node;
44 }
45 
46 
47 /**
48  * Flush as many messages as possible without blocking from
49  * the send q to the specified node.
50  */
flush_send_q_node(long node)51 static void flush_send_q_node(long node)
52 {
53     while (TCGMSG_proc_info[node].sendq) {
54 
55         if (!async_send(TCGMSG_proc_info[node].sendq)) {
56             /* Send is incomplete ... stop processing this q*/
57             break;
58         }
59         else {
60             SendQEntry *tmp = TCGMSG_proc_info[node].sendq;
61 
62             TCGMSG_proc_info[node].sendq = (SendQEntry *) TCGMSG_proc_info[node].sendq->next;
63             if (tmp->free_buf_on_completion)
64                 (void) free(tmp->buf);
65             tmp->active = false;    /* Matches NewSendQEntry() */
66         }
67     }
68 }
69 
70 
71 /**
72  * Flush as many messages as possible without blocking
73  * from all of the send q's.
74  */
flush_send_q()75 void flush_send_q()
76 {
77     long node;
78     long nproc = NNODES_();
79 
80     for (node=0; node<nproc; node++)
81         if (TCGMSG_proc_info[node].sendq)
82             flush_send_q_node(node);
83 }
84 
85 
86 /**
87  * Return 0 if the message operation is incomplete.
88  * Return 1 if the message operation is complete.
89  */
msg_status(long msgid)90 long msg_status(long msgid)
91 {
92     long node = NodeFromMsgID(msgid);
93     SendQEntry *entry;
94     long status = 1;
95 
96     flush_send_q();
97 
98     /* Attempt to find the msgid in the message q.  If it is not
99        there then the send is complete */
100 
101     for (entry=TCGMSG_proc_info[node].sendq; entry; entry=(SendQEntry *) entry->next) {
102         if (entry->msgid == msgid) {
103             status = 0;
104             break;
105         }
106     }
107 
108     return status;
109 }
110 
111 
112 /**
113  * Wait for the operation referred to by msgid to complete.
114  */
msg_wait(long msgid)115 void msg_wait(long msgid)
116 {
117     long nspin = 0;
118     long spinlim = 1000000;
119 
120     while (!msg_status(msgid)) {
121         nspin++;
122         if (nspin < spinlim)
123             Busy(100);
124         else
125             usleep(1);
126     }
127 }
128 
129 
NewSendQEntry(void)130 static SendQEntry *NewSendQEntry(void)
131 {
132     SendQEntry *new = TCGMSG_sendq_ring;
133 
134     if (new->active)
135         Error("NewSendQEntry: too many outstanding sends\n", 0L);
136 
137     TCGMSG_sendq_ring = (SendQEntry *) TCGMSG_sendq_ring->next_in_ring;
138 
139     new->active = true;
140 
141     return new;
142 }
143 
144 
msg_async_snd(long type,char * buf,long lenbuf,long node)145 long msg_async_snd(long type, char *buf, long lenbuf, long node)
146 {
147     long msgid;
148     SendQEntry *entry;
149 
150     if (node<0 || node>=TCGMSG_nnodes)
151         Error("msg_async_send: node is out of range", node);
152 
153     if (node == TCGMSG_nodeid)
154         Error("msg_async_send: cannot send to self", node);
155 
156     msgid = NextMsgID(node);
157     entry = NewSendQEntry();
158 
159     /* Insert a new entry into the q */
160 
161     entry->tag   = TCGMSG_proc_info[node].n_snd++; /* Increment tag */
162     entry->msgid = msgid;
163     entry->type  = type;
164     entry->buf   = buf;
165     entry->free_buf_on_completion = 0;
166     entry->lenbuf= lenbuf;
167     entry->node  = node;
168     entry->next  = (SendQEntry *) 0;
169     entry->written = 0;
170     entry->buffer_number = 0;
171 
172     /* Attach to the send q */
173 
174     if (!TCGMSG_proc_info[node].sendq)
175         TCGMSG_proc_info[node].sendq = entry;
176     else {
177         SendQEntry *cur = TCGMSG_proc_info[node].sendq;
178 
179         while (cur->next)
180             cur = cur->next;
181         cur->next = entry;
182     }
183 
184     /* Attempt to flush the send q */
185 
186     flush_send_q();
187 
188     return msgid;
189 }
190 
191 
192 /**
193  * synchronous send of message to a process
194  *
195  * long *type     = user defined integer message type (input)
196  * char *buf      = data buffer (input)
197  * long *lenbuf   = length of buffer in bytes (input)
198  * long *node     = node to send to (input)
199  *
200  * for zero length messages only the header is sent
201  */
msg_snd(long type,char * buf,long lenbuf,long node)202 void msg_snd(long type, char *buf, long lenbuf, long node)
203 {
204     msg_wait(msg_async_snd(type, buf, lenbuf, node));
205 }
206