1 #include <u.h>
2 #include <libc.h>
3 #include <venti.h>
4 #include "queue.h"
5 
6 long ventisendbytes, ventisendpackets;
7 long ventirecvbytes, ventirecvpackets;
8 
9 static int
_vtsend(VtConn * z,Packet * p)10 _vtsend(VtConn *z, Packet *p)
11 {
12 	IOchunk ioc;
13 	int n, tot;
14 	uchar buf[4];
15 
16 	if(z->state != VtStateConnected) {
17 		werrstr("session not connected");
18 		return -1;
19 	}
20 
21 	/* add framing */
22 	n = packetsize(p);
23 	if(z->version[1] == '2') {
24 		if(n >= (1<<16)) {
25 			werrstr("packet too large");
26 			packetfree(p);
27 			return -1;
28 		}
29 		buf[0] = n>>8;
30 		buf[1] = n;
31 		packetprefix(p, buf, 2);
32 		ventisendbytes += n+2;
33 	} else {
34 		buf[0] = n>>24;
35 		buf[1] = n>>16;
36 		buf[2] = n>>8;
37 		buf[3] = n;
38 		packetprefix(p, buf, 4);
39 		ventisendbytes += n+4;
40 	}
41 	ventisendpackets++;
42 
43 	tot = 0;
44 	for(;;){
45 		n = packetfragments(p, &ioc, 1, 0);
46 		if(n == 0)
47 			break;
48 		if(write(z->outfd, ioc.addr, ioc.len) < ioc.len){
49 			vtlog(VtServerLog, "<font size=-1>%T %s:</font> sending packet %p: %r<br>\n", z->addr, p);
50 			packetfree(p);
51 			return -1;
52 		}
53 		packetconsume(p, nil, ioc.len);
54 		tot += ioc.len;
55 	}
56 	vtlog(VtServerLog, "<font size=-1>%T %s:</font> sent packet %p (%d bytes)<br>\n", z->addr, p, tot);
57 	packetfree(p);
58 	return 1;
59 }
60 
61 static int
interrupted(void)62 interrupted(void)
63 {
64 	char e[ERRMAX];
65 
66 	rerrstr(e, sizeof e);
67 	return strstr(e, "interrupted") != nil;
68 }
69 
70 
71 static Packet*
_vtrecv(VtConn * z)72 _vtrecv(VtConn *z)
73 {
74 	uchar buf[10], *b;
75 	int n, need;
76 	Packet *p;
77 	int size, len;
78 
79 	if(z->state != VtStateConnected) {
80 		werrstr("session not connected");
81 		return nil;
82 	}
83 
84 	p = z->part;
85 	/* get enough for head size */
86 	size = packetsize(p);
87 	need = z->version[1] - '0';	// 2 or 4
88 	while(size < need) {
89 		b = packettrailer(p, need);
90 		assert(b != nil);
91 		if(0) fprint(2, "%d read hdr\n", getpid());
92 		n = read(z->infd, b, need);
93 		if(0) fprint(2, "%d got %d (%r)\n", getpid(), n);
94 		if(n==0 || (n<0 && !interrupted()))
95 			goto Err;
96 		size += n;
97 		packettrim(p, 0, size);
98 	}
99 
100 	if(packetconsume(p, buf, need) < 0)
101 		goto Err;
102 	if(z->version[1] == '2') {
103 		len = (buf[0] << 8) | buf[1];
104 		size -= 2;
105 	} else {
106 		len = (buf[0]<<24) | (buf[1]<<16) | (buf[2]<<8) | buf[3];
107 		size -= 4;
108 	}
109 
110 	while(size < len) {
111 		n = len - size;
112 		if(n > MaxFragSize)
113 			n = MaxFragSize;
114 		b = packettrailer(p, n);
115 		if(0) fprint(2, "%d read body %d\n", getpid(), n);
116 		n = read(z->infd, b, n);
117 		if(0) fprint(2, "%d got %d (%r)\n", getpid(), n);
118 		if(n > 0)
119 			size += n;
120 		packettrim(p, 0, size);
121 		if(n==0 || (n<0 && !interrupted()))
122 			goto Err;
123 	}
124 	ventirecvbytes += len;
125 	ventirecvpackets++;
126 	p = packetsplit(p, len);
127 	vtlog(VtServerLog, "<font size=-1>%T %s:</font> read packet %p len %d<br>\n", z->addr, p, len);
128 	return p;
129 Err:
130 	vtlog(VtServerLog, "<font size=-1>%T %s:</font> error reading packet: %r<br>\n", z->addr);
131 	return nil;
132 }
133 
134 /*
135  * If you fork off two procs running vtrecvproc and vtsendproc,
136  * then vtrecv/vtsend (and thus vtrpc) will never block except on
137  * rendevouses, which is nice when it's running in one thread of many.
138  */
139 void
vtrecvproc(void * v)140 vtrecvproc(void *v)
141 {
142 	Packet *p;
143 	VtConn *z;
144 	Queue *q;
145 
146 	z = v;
147 	q = _vtqalloc();
148 
149 	qlock(&z->lk);
150 	z->readq = q;
151 	qlock(&z->inlk);
152 	rwakeup(&z->rpcfork);
153 	qunlock(&z->lk);
154 
155 	while((p = _vtrecv(z)) != nil)
156 		if(_vtqsend(q, p) < 0){
157 			packetfree(p);
158 			break;
159 		}
160 	qunlock(&z->inlk);
161 	qlock(&z->lk);
162 	_vtqhangup(q);
163 	while((p = _vtnbqrecv(q)) != nil)
164 		packetfree(p);
165 	_vtqdecref(q);
166 	z->readq = nil;
167 	rwakeup(&z->rpcfork);
168 	qunlock(&z->lk);
169 	vthangup(z);
170 }
171 
172 void
vtsendproc(void * v)173 vtsendproc(void *v)
174 {
175 	Queue *q;
176 	Packet *p;
177 	VtConn *z;
178 
179 	z = v;
180 	q = _vtqalloc();
181 
182 	qlock(&z->lk);
183 	z->writeq = q;
184 	qlock(&z->outlk);
185 	rwakeup(&z->rpcfork);
186 	qunlock(&z->lk);
187 
188 	while((p = _vtqrecv(q)) != nil)
189 		if(_vtsend(z, p) < 0)
190 			break;
191 	qunlock(&z->outlk);
192 	qlock(&z->lk);
193 	_vtqhangup(q);
194 	while((p = _vtnbqrecv(q)) != nil)
195 		packetfree(p);
196 	_vtqdecref(q);
197 	z->writeq = nil;
198 	rwakeup(&z->rpcfork);
199 	qunlock(&z->lk);
200 	return;
201 }
202 
203 Packet*
vtrecv(VtConn * z)204 vtrecv(VtConn *z)
205 {
206 	Packet *p;
207 	Queue *q;
208 
209 	qlock(&z->lk);
210 	if(z->state != VtStateConnected){
211 		werrstr("not connected");
212 		qunlock(&z->lk);
213 		return nil;
214 	}
215 	if(z->readq){
216 		q = _vtqincref(z->readq);
217 		qunlock(&z->lk);
218 		p = _vtqrecv(q);
219 		_vtqdecref(q);
220 		return p;
221 	}
222 
223 	qlock(&z->inlk);
224 	qunlock(&z->lk);
225 	p = _vtrecv(z);
226 	qunlock(&z->inlk);
227 	if(!p)
228 		vthangup(z);
229 	return p;
230 }
231 
232 int
vtsend(VtConn * z,Packet * p)233 vtsend(VtConn *z, Packet *p)
234 {
235 	Queue *q;
236 
237 	qlock(&z->lk);
238 	if(z->state != VtStateConnected){
239 		packetfree(p);
240 		werrstr("not connected");
241 		qunlock(&z->lk);
242 		return -1;
243 	}
244 	if(z->writeq){
245 		q = _vtqincref(z->writeq);
246 		qunlock(&z->lk);
247 		if(_vtqsend(q, p) < 0){
248 			_vtqdecref(q);
249 			packetfree(p);
250 			return -1;
251 		}
252 		_vtqdecref(q);
253 		return 0;
254 	}
255 
256 	qlock(&z->outlk);
257 	qunlock(&z->lk);
258 	if(_vtsend(z, p) < 0){
259 		qunlock(&z->outlk);
260 		vthangup(z);
261 		return -1;
262 	}
263 	qunlock(&z->outlk);
264 	return 0;
265 }
266