1 /* hiread.c - Hiquu I/O Engine Read Operations
2 * Copyright (c) 2006,2012 Sampo Kellomaki (sampo@iki.fi), All Rights Reserved.
3 * This is confidential unpublished proprietary source code of the author.
4 * NO WARRANTY, not even implied warranties. Contains trade secrets.
5 * Distribution prohibited unless authorized in writing. See file COPYING.
6 * Special grant: hiread.c may be used with zxid open source project under
7 * same licensing terms as zxid itself.
8 * $Id$
9 *
10 * 15.4.2006, created over Easter holiday --Sampo
11 * 16.8.2012, modified license grant to allow use with ZXID.org --Sampo
12 * 19.8.2012, fixed serious free_pds manipulation bug in hi_pdu_alloc() --Sampo
13 * 6.9.2012, added support for TLS and SSL --Sampo
14 *
15 * Read more data to existing PDU (cur_pdu), or create new PDU and read data into it.
16 */
17
18 #include <pthread.h>
19 #include <memory.h>
20 #include <sys/types.h>
21 #include <sys/socket.h>
22 #include <errno.h>
23 #include <unistd.h>
24 #include <string.h>
25 #include "akbox.h"
26 #include "hiios.h"
27 #include "hiproto.h"
28 #include "errmac.h"
29
30 #include <zx/zx.h> /* for zx_report_openssl_err() */
31
32 extern int errmac_debug;
33
34 #define SSL_ENCRYPTED_HINT "ERROR\nmessage:tls-needed\n\nTLS or SSL connection wanted but other end did not speak protocol.\n\0"
35
36 /*() Allocate pdu. First allocation from per thread pool is
37 * attempted. This does not require any locking. If that does not
38 * work out, recourse to the shuffler level global pool, with locking, is made.
39 * locking:: takes shf->pdu_mut
40 * see also:: hi_pdu_free() */
41
42 /* Called by: */
hi_pdu_alloc(struct hi_thr * hit,const char * lk)43 struct hi_pdu* hi_pdu_alloc(struct hi_thr* hit, const char* lk)
44 {
45 struct hi_pdu* pdu;
46
47 HI_SANITY(hit->shf, hit);
48
49 if (hit->free_pdus) {
50 --hit->n_free_pdus;
51 pdu = hit->free_pdus;
52 hit->free_pdus = (struct hi_pdu*)pdu->qel.n;
53 D("%s: alloc pdu_%p thr n_free=%d", lk, pdu, hit->n_free_pdus);
54 ASSERTOPI(pdu->qel.intodo, ==, HI_INTODO_HIT_FREE);
55 ASSERT(pdu != hit->free_pdus);
56 goto retpdu;
57 }
58
59 LOCK(hit->shf->pdu_mut, "pdu_alloc");
60 if (hit->shf->free_pdus) {
61 pdu = hit->shf->free_pdus;
62 hit->shf->free_pdus = (struct hi_pdu*)pdu->qel.n;
63 D("%s: alloc pdu(%p) from shuff", lk, pdu);
64 ASSERT(pdu != hit->shf->free_pdus);
65 ASSERTOPI(pdu->qel.intodo, ==, HI_INTODO_SHF_FREE);
66 UNLOCK(hit->shf->pdu_mut, "pdu_alloc-ok");
67 goto retpdu;
68 }
69 UNLOCK(hit->shf->pdu_mut, "pdu_alloc-no-pdu");
70
71 ERR("Out of PDUs. Use -npdu (current is %d) to specify a value at least the value of 4x nfd + 10x nthr + 5 = %d, or even bigger if there are many pending messages waiting for delivery. (%s)", hit->shf->max_pdus, hit->shf->max_ios*4+hit->shf->nthr*10+5, lk);
72 return 0;
73
74 retpdu:
75 pdu->qel.intodo = HI_INTODO_PDUINUSE;
76 pdu->lim = pdu->mem + HI_PDU_MEM;
77 pdu->m = pdu->scan = pdu->ap = pdu->mem;
78 pdu->req = pdu->parent = pdu->subresps = pdu->reals = pdu->synths = 0;
79 pdu->fe = 0;
80 pdu->need = 1; /* trigger network I/O */
81 pdu->n = 0;
82 HI_SANITY(hit->shf, hit);
83 return pdu;
84 }
85
86 /*() Check if there is more (than need for cur_pdu) in the input buffer.
87 * Sometimes the input buffer contains more than
88 * a PDU's worth and therefore needs to be prepared
89 * as input for the next PDU.
90 * The req->need field has to accurately reflact the size of the PDU. Typically
91 * it is set in later stages of decoder.
92 * As hi_checkmore() will cause cur_pdu to change, it is common to call hi_add_reqs() */
93
94 /* Called by: hi_add_to_reqs */
hi_checkmore(struct hi_thr * hit,struct hi_io * io,struct hi_pdu * req,int minlen)95 static void hi_checkmore(struct hi_thr* hit, struct hi_io* io, struct hi_pdu* req, int minlen)
96 {
97 int n = req->ap - req->m;
98
99 /* Allocate next io->cur_pdu here
100 * a. because we already hold io->qel.mut (saves lock, otherwise hi_read() will alloc)
101 * b. because we might need to copy tail of previous req to it */
102
103 io->cur_pdu = hi_pdu_alloc(hit, "cur_pdu-ckm");
104 if (!io->cur_pdu) { hi_dump(hit->shf); NEVERNEVER("*** out of pdus in bad place %d", n); }
105 io->cur_pdu->fe = io;
106 io->cur_pdu->need = minlen;
107 ++io->n_pdu_in; /* stats */
108
109 D("Chkmore(%x) mn=%d n=%d req_%p->need=%d", io->fd, minlen, n, req, req->need);
110 ASSERT(minlen > 0); /* If this is ever zero it will prevent hi_poll() from producing. */
111 if (n > req->need) {
112 memcpy(io->cur_pdu->ap, req->m + req->need, n - req->need);
113 io->cur_pdu->ap += n - req->need;
114 req->ap = req->m + req->need; /* final length of decoded PDU */
115 }
116 }
117
118 /*() Add a PDU to the reqs associated with the io object.
119 * Often moving PDU to reqs means it should stop being cur_pdu.
120 * If minlen (protocol dependent PDU minimum length) is passed,
121 * hi_checkmore() processing is triggered and cur_pdu dealt with.
122 * Clearing cur_pdu is important to enable the hi_in_out() to
123 * admit new worker thread to perform read work.
124 * locking:: takes io->qel.mut
125 * see also:: hi_del_from_reqs() */
126
127 /* Called by: http_decode, stomp_decode, stomp_frame_err, test_ping */
hi_add_to_reqs(struct hi_thr * hit,struct hi_io * io,struct hi_pdu * req,int minlen)128 void hi_add_to_reqs(struct hi_thr* hit, struct hi_io* io, struct hi_pdu* req, int minlen)
129 {
130 LOCK(io->qel.mut, "add_to_reqs");
131 D("LOCK io(%x)->qel.thr=%lx", io->fd, (long)io->qel.mut.thr);
132 if (minlen) {
133 ASSERTOPP(io->cur_pdu, ==, req);
134 ASSERT(io->reading);
135 io->reading = 0; /* reading was set in hi_in_out() */
136 D("Out of reading(%x) n_thr=%d (reset cur_pdu)", io->fd, io->n_thr);
137 hi_checkmore(hit, io, req, minlen);
138 }
139 /* --io->n_thr; ASSERT(io->n_thr >= 0); N.B. dec n_thr for read is handled in hi_read() */
140 ASSERTOPP(req->fe, ==, io);
141 req->n = io->reqs;
142 io->reqs = req;
143 D("UNLOCK io(%x)->qel.thr=%lx req_%p", io->fd, (long)io->qel.mut.thr, req);
144 UNLOCK(io->qel.mut, "add_to_reqs");
145 }
146
147 /*() Read from the network, with all the repercussions.
148 * When entering, n_thr will have been incremented for the read count.
149 * No matter how many iterations may happen (and PDUs be processed),
150 * the n_thr count will be decremented by one in end of this
151 * function. There is no need to manipulate n_thr in decoders, but if
152 * decoder also engages in write, this is handled in hi_send0(). */
153
154 /* Called by: hi_in_out */
hi_read(struct hi_thr * hit,struct hi_io * io)155 int hi_read(struct hi_thr* hit, struct hi_io* io)
156 {
157 int ret,err;
158 struct hi_pdu* pdu;
159 while (1) { /* eagerly read until we exhaust the read (c.f. edge triggered epoll) */
160 ASSERT(io->reading);
161 pdu = io->cur_pdu;
162 D("loop(%x)->cur_pdu=%p ssl_%p", io->fd, pdu, io->ssl);
163 ASSERT(pdu); /* Exists either through hi_shuff_init() or through hi_check_more() */
164 retry:
165 D("read(%x) have=%d need=%d buf_avail=%d", io->fd, (int)(pdu->ap-pdu->m), pdu->need, (int)(pdu->lim-pdu->ap));
166 ASSERT(io->reading);
167 #ifdef USE_OPENSSL
168 if (io->ssl) {
169 ret = SSL_read(io->ssl, pdu->ap, pdu->lim - pdu->ap);
170 switch (err = SSL_get_error(io->ssl, ret)) {
171 case SSL_ERROR_NONE: break; /* Something read case */
172 case SSL_ERROR_WANT_READ:
173 D("SSL EAGAIN READ fd(%x)", io->fd); /* Comparable to EAGAIN. Should we remember which? */
174 //h->ioflags |= IO_MISSPOLL; /** Need more data after poll() **/
175 zx_report_openssl_err("SSL again read"); /* *** do we need this to clear error stack? */
176 goto eagain_out;
177 case SSL_ERROR_WANT_WRITE:
178 D("SSL EAGAIN WRITE fd(%x)", io->fd); /* Comparable to EAGAIN. Should we remember which? */
179 zx_report_openssl_err("SSL again write"); /* *** do we need this to clear error stack? */
180 goto eagain_out;
181 case SSL_ERROR_ZERO_RETURN: D("SSL EOF fd(%x)", io->fd); goto conn_close;
182 default:
183 ERR("SSL_read(%x) ret=%d err=%d", io->fd, ret, err);
184 zx_report_openssl_err("SSL_read");
185 if (!io->n_read) {
186 ERR("SSL Conn. failed(%x) ret=%d err=%d errno=%d %s", io->fd, ret, err, errno, STRERROR(errno));
187 if (errno != ECONNREFUSED)
188 write(io->fd, SSL_ENCRYPTED_HINT, sizeof(SSL_ENCRYPTED_HINT)-1);
189 }
190 /* N.B. A socket layer EOF not detected, but would be an I/O error from SSL perspective. */
191 goto conn_close;
192 }
193 } else
194 #endif
195 {
196 ret = read(io->fd&0x7fffffff, pdu->ap, pdu->lim - pdu->ap); /* *** vs. need */
197 switch (ret) {
198 case 0:
199 /* *** any provision to process still pending PDUs? */
200 D("EOF fd(%x)", io->fd);
201 goto conn_close;
202 case -1:
203 switch (errno) {
204 case EINTR: D("EINTR fd(%x)", io->fd); goto retry;
205 case EAGAIN: D("EAGAIN READ fd(%x)", io->fd); goto eagain_out; /* read(2) exhausted (c.f. edge triggered epoll) */
206 default:
207 ERR("read(%x) failed: %d %s (closing connection)", io->fd, errno, STRERROR(errno));
208 goto conn_close;
209 }
210 }
211 }
212 /* something was read, invoke PDU parsing layer */
213 D("got(%x)=%d, proto=%d cur_pdu(%p) need=%d", io->fd, ret, io->qel.proto, pdu, pdu->need);
214 HEXDUMP("got:", pdu->ap, pdu->ap + ret, /*16*/ 256);
215 pdu->ap += ret;
216 io->n_read += ret;
217 while (pdu
218 && pdu->need /* no further I/O desired */
219 && pdu->need <= (pdu->ap - pdu->m)) {
220 D("decode_loop io(%x)->cur_pdu=%p need=%d", io->fd, pdu, pdu?pdu->need:-99);
221 ASSERTOPP(pdu, ==, io->cur_pdu);
222 switch (io->qel.proto) {
223 /* DISPATCH: Following decoders MUST either
224 * a. drop connection in which case any rescheduling is moot
225 * b. consume cur_pdu (set it to 0 or new PDU). This will cause
226 * read to be consumed until exhausted, and later trigger new todo
227 * when there is more data to be had, see hi_poll()
228 * c. take some other action such as scheduling PDU to todo. Typically
229 * the req->need is zero when I/O is not expected. */
230 #ifdef ENA_S5066
231 case HIPROTO_SIS: if (sis_decode(hit, io)) goto conn_close; break; /* *** use ret */
232 case HIPROTO_DTS: if (dts_decode(hit, io)) goto conn_close; break; /* *** use ret */
233 #endif
234 case HIPROTO_HTTP: if (http_decode(hit, io)) goto conn_close; break; /* *** use ret */
235 //case HIPROTO_STOMP: if (stomp_decode(hit, io)) goto conn_close; break;
236 case HIPROTO_STOMP: ret = stomp_decode(hit, io); break;
237 case HIPROTO_TEST_PING: test_ping(hit, io); break; /* *** use ret */
238 case HIPROTO_SMTP: /* *** use ret */
239 if (io->qel.kind == HI_TCP_C) {
240 if (smtp_decode_resp(hit, io)) goto out;
241 } else {
242 if (smtp_decode_req(hit, io)) goto out;
243 }
244 break;
245 /* *** add here a project dependent include? Or a macro. */
246 default: NEVERNEVER("unknown proto(%x)", io->qel.proto);
247 }
248
249 /* Take another iteration. io->reading may have already been set (incompletely decoded
250 * PDU) or it may have been cleared (completely decoded PDU). Additional
251 * complication is that it may have been cleared during PDU processing, but
252 * then set again bu a different thread, such as second reader. *** */
253
254 switch (ret) {
255 case HI_NOERR: /* 0: In this case io->reading has been cleared at hi_add_req() due to
256 * completely decoded PDU. It may have been acquired by other thread. */
257 LOCK(io->qel.mut, "reset-reading");
258 D("LOCK io(%x)->qel.thr=%lx n_c/t=%d/%d", io->fd, (long)io->qel.mut.thr, io->n_close, io->n_thr);
259 if (io->reading) {
260 D("Somebody else already reading n_thr=%d", io->n_thr);
261 --io->n_thr; /* Remove read count. */
262 ASSERT(io->n_thr >= 0);
263 ASSERT(hit->cur_io == io);
264 ASSERT(hit->cur_n_close == io->n_close);
265 hit->cur_io = 0;
266 D("UNLOCK io(%x)->qel.thr=%lx", io->fd, (long)io->qel.mut.thr);
267 UNLOCK(io->qel.mut, "reset-reading-abort");
268 return 0;
269 }
270 io->reading = 1; /* reacquire */
271 pdu = io->cur_pdu;
272 D("reacquired reading(%x) n_thr=%d", io->fd, io->n_thr);
273 D("UNLOCK io(%x)->qel.thr=%lx", io->fd, (long)io->qel.mut.thr);
274 UNLOCK(io->qel.mut, "reset-reading");
275 break;
276 case HI_CONN_CLOSE: goto conn_close_not_reading;
277 case HI_NEED_MORE: /* 2: Incomplete decode, we still hold io->reading. */
278 ASSERT(io->reading);
279 break;
280 }
281 }
282 }
283
284 eagain_out:
285 /* A special problem with EAGAIN: read(2) is not guaranteed to arm edge triggered epoll(2)
286 * unless at least one EAGAIN read has happened. The problem is that as we are still
287 * in io->reading, if after this EAGAIN another thread polls and consumes from todo, it
288 * will not be able to read due to io->reading even though poll told it to read. After
289 * missing the opportunity, the next poll will not report fd anymore because no read has
290 * happened since previous report. Ouch!
291 * Solution attempt: if read was polled, but could not be served due to io->reading.
292 * the PDU is added back to the todo queue. This may cause the other thread to spin
293 * for a while, but at least things will move on eventually. */
294 out:
295 LOCK(io->qel.mut, "clear-reading");
296 D("RD-OUT: LOCK & UNLOCK io(%x)->qel.thr=%lx", io->fd, (long)io->qel.mut.thr);
297 ASSERT(io->reading);
298 io->reading = 0;
299 --io->n_thr; /* Remove read count. */
300 ASSERT(hit->cur_io == io);
301 ASSERT(hit->cur_n_close == io->n_close);
302 hit->cur_io = 0;
303 D("out of reading(%x) n_thr=%d", io->fd, io->n_thr);
304 ASSERT(io->n_thr >= 0);
305 UNLOCK(io->qel.mut, "clear-reading");
306 return 0;
307
308 conn_close:
309 /* Connection close due to EOF, etc. We are still in io->reading. */
310 LOCK(io->qel.mut, "clear-reading-close");
311 D("RD-CLO: LOCK & UNLOCK io(%x)->qel.thr=%lx", io->fd, (long)io->qel.mut.thr);
312 ASSERT(io->reading);
313 io->reading = 0;
314 --io->n_thr; /* Remove read count. */
315 ASSERT(io->n_thr >= 0);
316 D("out of reading(%x) n_thr=%d (close)", io->fd, io->n_thr);
317 UNLOCK(io->qel.mut, "clear-reading-close");
318 hi_close(hit, io, "hi_read"); /* will clear hit->cur_io */
319 return 1;
320
321 conn_close_not_reading:
322 /* Connection close after PDU has been read, e.g. protocol level DISCONNECT.
323 * We are no longer in io->reading (but other thread might be).
324 * io->n_thr still needs to be decremented. */
325 LOCK(io->qel.mut, "clear-reading-close");
326 D("RD-NRC: LOCK & UNLOCK io(%x)->qel.thr=%lx", io->fd, (long)io->qel.mut.thr);
327 /*ASSERT(!io->reading); This can not be asserted as other thread might have gone to read. */
328 --io->n_thr; /* Remove read count. */
329 D("not_reading-close(%x) n_thr=%d r/w=%d/%d ev=%d", io->fd, io->n_thr, io->reading, io->writing, io->events);
330 ASSERT(io->n_thr >= 0);
331 UNLOCK(io->qel.mut, "not_reading-close");
332 hi_close(hit, io, "hi_read-no_reading"); /* will clear hit->cur_io */
333 return 1;
334 }
335
336 /* EOF -- hiread.c */
337
338 #if 0
339
340 N.B. Be careful to link against same OPenSSL libraries as headers. Otherwise
341 reference count errors may appers (at least with 1.0.1 headers agains 1.0.0 libs).
342
343 Mystery SSL error (in the end caused by errors left in SSL stack by private key reading)
344
345 On LinuxMint 12 / Ubuntu / Debian
346 tb784d6e0 hiios.c:201 hi_new_shuffler zx d OpenSSL header-version(1000005f) lib-version(OpenSSL 1.0.0e 6 Sep 2011) cflags(compiler: cc -fPIC -DOPENSSL_PIC -DZLIB -DOPENSSL_THREADS -D_REENTRANT -DDSO_DLFCN -DHAVE_DLFCN_H -DL_ENDIAN -DTERMIO -O3 -Wa,--noexecstack -g -Wall) builton(built on: Thu Feb 9 00:57:05 UTC 2012) platform(platform: debian-i386) dir(OPENSSLDIR: "/usr/lib/ssl")
347
348 // On first connection, after several successful reads and writes
349
350 tb784d6e0 hiread.c:164 hi_read zx d read(9) have=0 need=6 buf_avail=3072
351 tb784d6e0 hiread.c:182 hi_read zx E SSL_read ret=-1 err=1
352 tb784d6e0 zxsig.c:404 zx_report_openssl_err zx E SSL_read: OpenSSL error(218529960) error:0D0680A8:asn1 encoding routines:ASN1_CHECK_TLEN:wrong tag (tasn_dec.c:1319): ? 0
353 tb784d6e0 zxsig.c:404 zx_report_openssl_err zx E SSL_read: OpenSSL error(218546234) error:0D06C03A:asn1 encoding routines:ASN1_D2I_EX_PRIMITIVE:nested asn1 error (tasn_dec.c:831): ? 0
354 tb784d6e0 zxsig.c:404 zx_report_openssl_err zx E SSL_read: OpenSSL error(218640442) error:0D08303A:asn1 encoding routines:ASN1_TEMPLATE_NOEXP_D2I:nested asn1 error (tasn_dec.c:751): Field=n, Type=RSA 3
355 tb784d6e0 zxsig.c:404 zx_report_openssl_err zx E SSL_read: OpenSSL error(67710980) error:04093004:rsa routines:OLD_RSA_PRIV_DECODE:RSA lib (rsa_ameth.c:115): ? 0
356
357 // On subsequent connections
358
359 tb784d6e0 hiread.c:164 hi_read zx d read(a) have=0 need=6 buf_avail=3072
360 tb784d6e0 hiios.c:116 zxbus_info_cb zx d SSL3 alert write:fatal:protocol version
361
362 tb784d6e0 hiread.c:182 hi_read zx E SSL_read ret=-1 err=1
363 tb784d6e0 zxsig.c:404 zx_report_openssl_err zx E SSL_read: OpenSSL error(336130315) error:1408F10B:SSL routines:SSL3_GET_RECORD:wrong version number (s3_pkt.c:339): ? 0
364
365 // Someone one the net has similar problem with postfix mail
366
367 http://old.nabble.com/error%3A1408F10B%3ASSL-routines%3ASSL3_GET_RECORD%3Awrong-version-number-td33867821.html
368
369 Known issue (20120518)
370
371 http://cvs.openssl.org/chngview?cn=22565
372
373 openssl/ssl/s3_pkt.c 1.72.2.7.2.15 -> 1.72.2.7.2.16
374
375 --- s3_pkt.c 2012/04/17 13:20:19 1.72.2.7.2.15
376 +++ s3_pkt.c 2012/05/11 13:32:26 1.72.2.7.2.16
377 @@ -744,6 +744,7 @@
378 * bytes and record version number > TLS 1.0
379 */
380 if (s->state == SSL3_ST_CW_CLNT_HELLO_B
381 + && !s->renegotiate
382 && TLS1_get_version(s) > TLS1_VERSION)
383 *(p++) = 0x1;
384 else
385
386
387 #endif
388