1 #include <u.h>
2 #include <libc.h>
3 #include <venti.h>
4 #include "queue.h"
5
6 long ventisendbytes, ventisendpackets;
7 long ventirecvbytes, ventirecvpackets;
8
9 static int
_vtsend(VtConn * z,Packet * p)10 _vtsend(VtConn *z, Packet *p)
11 {
12 IOchunk ioc;
13 int n, tot;
14 uchar buf[4];
15
16 if(z->state != VtStateConnected) {
17 werrstr("session not connected");
18 return -1;
19 }
20
21 /* add framing */
22 n = packetsize(p);
23 if(z->version[1] == '2') {
24 if(n >= (1<<16)) {
25 werrstr("packet too large");
26 packetfree(p);
27 return -1;
28 }
29 buf[0] = n>>8;
30 buf[1] = n;
31 packetprefix(p, buf, 2);
32 ventisendbytes += n+2;
33 } else {
34 buf[0] = n>>24;
35 buf[1] = n>>16;
36 buf[2] = n>>8;
37 buf[3] = n;
38 packetprefix(p, buf, 4);
39 ventisendbytes += n+4;
40 }
41 ventisendpackets++;
42
43 tot = 0;
44 for(;;){
45 n = packetfragments(p, &ioc, 1, 0);
46 if(n == 0)
47 break;
48 if(write(z->outfd, ioc.addr, ioc.len) < ioc.len){
49 vtlog(VtServerLog, "<font size=-1>%T %s:</font> sending packet %p: %r<br>\n", z->addr, p);
50 packetfree(p);
51 return -1;
52 }
53 packetconsume(p, nil, ioc.len);
54 tot += ioc.len;
55 }
56 vtlog(VtServerLog, "<font size=-1>%T %s:</font> sent packet %p (%d bytes)<br>\n", z->addr, p, tot);
57 packetfree(p);
58 return 1;
59 }
60
61 static int
interrupted(void)62 interrupted(void)
63 {
64 char e[ERRMAX];
65
66 rerrstr(e, sizeof e);
67 return strstr(e, "interrupted") != nil;
68 }
69
70
71 static Packet*
_vtrecv(VtConn * z)72 _vtrecv(VtConn *z)
73 {
74 uchar buf[10], *b;
75 int n, need;
76 Packet *p;
77 int size, len;
78
79 if(z->state != VtStateConnected) {
80 werrstr("session not connected");
81 return nil;
82 }
83
84 p = z->part;
85 /* get enough for head size */
86 size = packetsize(p);
87 need = z->version[1] - '0'; // 2 or 4
88 while(size < need) {
89 b = packettrailer(p, need);
90 assert(b != nil);
91 if(0) fprint(2, "%d read hdr\n", getpid());
92 n = read(z->infd, b, need);
93 if(0) fprint(2, "%d got %d (%r)\n", getpid(), n);
94 if(n==0 || (n<0 && !interrupted()))
95 goto Err;
96 size += n;
97 packettrim(p, 0, size);
98 }
99
100 if(packetconsume(p, buf, need) < 0)
101 goto Err;
102 if(z->version[1] == '2') {
103 len = (buf[0] << 8) | buf[1];
104 size -= 2;
105 } else {
106 len = (buf[0]<<24) | (buf[1]<<16) | (buf[2]<<8) | buf[3];
107 size -= 4;
108 }
109
110 while(size < len) {
111 n = len - size;
112 if(n > MaxFragSize)
113 n = MaxFragSize;
114 b = packettrailer(p, n);
115 if(0) fprint(2, "%d read body %d\n", getpid(), n);
116 n = read(z->infd, b, n);
117 if(0) fprint(2, "%d got %d (%r)\n", getpid(), n);
118 if(n > 0)
119 size += n;
120 packettrim(p, 0, size);
121 if(n==0 || (n<0 && !interrupted()))
122 goto Err;
123 }
124 ventirecvbytes += len;
125 ventirecvpackets++;
126 p = packetsplit(p, len);
127 vtlog(VtServerLog, "<font size=-1>%T %s:</font> read packet %p len %d<br>\n", z->addr, p, len);
128 return p;
129 Err:
130 vtlog(VtServerLog, "<font size=-1>%T %s:</font> error reading packet: %r<br>\n", z->addr);
131 return nil;
132 }
133
134 /*
135 * If you fork off two procs running vtrecvproc and vtsendproc,
136 * then vtrecv/vtsend (and thus vtrpc) will never block except on
137 * rendevouses, which is nice when it's running in one thread of many.
138 */
139 void
vtrecvproc(void * v)140 vtrecvproc(void *v)
141 {
142 Packet *p;
143 VtConn *z;
144 Queue *q;
145
146 z = v;
147 q = _vtqalloc();
148
149 qlock(&z->lk);
150 z->readq = q;
151 qlock(&z->inlk);
152 rwakeup(&z->rpcfork);
153 qunlock(&z->lk);
154
155 while((p = _vtrecv(z)) != nil)
156 if(_vtqsend(q, p) < 0){
157 packetfree(p);
158 break;
159 }
160 qunlock(&z->inlk);
161 qlock(&z->lk);
162 _vtqhangup(q);
163 while((p = _vtnbqrecv(q)) != nil)
164 packetfree(p);
165 _vtqdecref(q);
166 z->readq = nil;
167 rwakeup(&z->rpcfork);
168 qunlock(&z->lk);
169 vthangup(z);
170 }
171
172 void
vtsendproc(void * v)173 vtsendproc(void *v)
174 {
175 Queue *q;
176 Packet *p;
177 VtConn *z;
178
179 z = v;
180 q = _vtqalloc();
181
182 qlock(&z->lk);
183 z->writeq = q;
184 qlock(&z->outlk);
185 rwakeup(&z->rpcfork);
186 qunlock(&z->lk);
187
188 while((p = _vtqrecv(q)) != nil)
189 if(_vtsend(z, p) < 0)
190 break;
191 qunlock(&z->outlk);
192 qlock(&z->lk);
193 _vtqhangup(q);
194 while((p = _vtnbqrecv(q)) != nil)
195 packetfree(p);
196 _vtqdecref(q);
197 z->writeq = nil;
198 rwakeup(&z->rpcfork);
199 qunlock(&z->lk);
200 return;
201 }
202
203 Packet*
vtrecv(VtConn * z)204 vtrecv(VtConn *z)
205 {
206 Packet *p;
207 Queue *q;
208
209 qlock(&z->lk);
210 if(z->state != VtStateConnected){
211 werrstr("not connected");
212 qunlock(&z->lk);
213 return nil;
214 }
215 if(z->readq){
216 q = _vtqincref(z->readq);
217 qunlock(&z->lk);
218 p = _vtqrecv(q);
219 _vtqdecref(q);
220 return p;
221 }
222
223 qlock(&z->inlk);
224 qunlock(&z->lk);
225 p = _vtrecv(z);
226 qunlock(&z->inlk);
227 if(!p)
228 vthangup(z);
229 return p;
230 }
231
232 int
vtsend(VtConn * z,Packet * p)233 vtsend(VtConn *z, Packet *p)
234 {
235 Queue *q;
236
237 qlock(&z->lk);
238 if(z->state != VtStateConnected){
239 packetfree(p);
240 werrstr("not connected");
241 qunlock(&z->lk);
242 return -1;
243 }
244 if(z->writeq){
245 q = _vtqincref(z->writeq);
246 qunlock(&z->lk);
247 if(_vtqsend(q, p) < 0){
248 _vtqdecref(q);
249 packetfree(p);
250 return -1;
251 }
252 _vtqdecref(q);
253 return 0;
254 }
255
256 qlock(&z->outlk);
257 qunlock(&z->lk);
258 if(_vtsend(z, p) < 0){
259 qunlock(&z->outlk);
260 vthangup(z);
261 return -1;
262 }
263 qunlock(&z->outlk);
264 return 0;
265 }
266