1 /* hiread.c  -  Hiquu I/O Engine Read Operations
2  * Copyright (c) 2006,2012 Sampo Kellomaki (sampo@iki.fi), All Rights Reserved.
3  * This is confidential unpublished proprietary source code of the author.
4  * NO WARRANTY, not even implied warranties. Contains trade secrets.
5  * Distribution prohibited unless authorized in writing. See file COPYING.
6  * Special grant: hiread.c may be used with zxid open source project under
7  * same licensing terms as zxid itself.
8  * $Id$
9  *
10  * 15.4.2006, created over Easter holiday --Sampo
11  * 16.8.2012, modified license grant to allow use with ZXID.org --Sampo
12  * 19.8.2012, fixed serious free_pds manipulation bug in hi_pdu_alloc() --Sampo
13  * 6.9.2012,  added support for TLS and SSL --Sampo
14  *
15  * Read more data to existing PDU (cur_pdu), or create new PDU and read data into it.
16  */
17 
18 #include <pthread.h>
19 #include <memory.h>
20 #include <sys/types.h>
21 #include <sys/socket.h>
22 #include <errno.h>
23 #include <unistd.h>
24 #include <string.h>
25 #include "akbox.h"
26 #include "hiios.h"
27 #include "hiproto.h"
28 #include "errmac.h"
29 
30 #include <zx/zx.h>  /* for zx_report_openssl_err() */
31 
32 extern int errmac_debug;
33 
34 #define SSL_ENCRYPTED_HINT "ERROR\nmessage:tls-needed\n\nTLS or SSL connection wanted but other end did not speak protocol.\n\0"
35 
36 /*() Allocate pdu.  First allocation from per thread pool is
37  * attempted. This does not require any locking.  If that does not
38  * work out, recourse to the shuffler level global pool, with locking, is made.
39  * locking:: takes shf->pdu_mut
40  * see also:: hi_pdu_free() */
41 
42 /* Called by: */
hi_pdu_alloc(struct hi_thr * hit,const char * lk)43 struct hi_pdu* hi_pdu_alloc(struct hi_thr* hit, const char* lk)
44 {
45   struct hi_pdu* pdu;
46 
47   HI_SANITY(hit->shf, hit);
48 
49   if (hit->free_pdus) {
50     --hit->n_free_pdus;
51     pdu = hit->free_pdus;
52     hit->free_pdus = (struct hi_pdu*)pdu->qel.n;
53     D("%s: alloc pdu_%p thr n_free=%d", lk, pdu, hit->n_free_pdus);
54     ASSERTOPI(pdu->qel.intodo, ==, HI_INTODO_HIT_FREE);
55     ASSERT(pdu != hit->free_pdus);
56     goto retpdu;
57   }
58 
59   LOCK(hit->shf->pdu_mut, "pdu_alloc");
60   if (hit->shf->free_pdus) {
61     pdu = hit->shf->free_pdus;
62     hit->shf->free_pdus = (struct hi_pdu*)pdu->qel.n;
63     D("%s: alloc pdu(%p) from shuff", lk, pdu);
64     ASSERT(pdu != hit->shf->free_pdus);
65     ASSERTOPI(pdu->qel.intodo, ==, HI_INTODO_SHF_FREE);
66     UNLOCK(hit->shf->pdu_mut, "pdu_alloc-ok");
67     goto retpdu;
68   }
69   UNLOCK(hit->shf->pdu_mut, "pdu_alloc-no-pdu");
70 
71   ERR("Out of PDUs. Use -npdu (current is %d) to specify a value at least the value of 4x nfd + 10x nthr + 5 = %d, or even bigger if there are many pending messages waiting for delivery. (%s)", hit->shf->max_pdus, hit->shf->max_ios*4+hit->shf->nthr*10+5, lk);
72   return 0;
73 
74  retpdu:
75   pdu->qel.intodo = HI_INTODO_PDUINUSE;
76   pdu->lim = pdu->mem + HI_PDU_MEM;
77   pdu->m = pdu->scan = pdu->ap = pdu->mem;
78   pdu->req = pdu->parent = pdu->subresps = pdu->reals = pdu->synths = 0;
79   pdu->fe = 0;
80   pdu->need = 1;  /* trigger network I/O */
81   pdu->n = 0;
82   HI_SANITY(hit->shf, hit);
83   return pdu;
84 }
85 
86 /*() Check if there is more (than need for cur_pdu) in the input buffer.
87  * Sometimes the input buffer contains more than
88  * a PDU's worth and therefore needs to be prepared
89  * as input for the next PDU.
90  * The req->need field has to accurately reflact the size of the PDU. Typically
91  * it is set in later stages of decoder.
92  * As hi_checkmore() will cause cur_pdu to change, it is common to call hi_add_reqs() */
93 
94 /* Called by:  hi_add_to_reqs */
hi_checkmore(struct hi_thr * hit,struct hi_io * io,struct hi_pdu * req,int minlen)95 static void hi_checkmore(struct hi_thr* hit, struct hi_io* io, struct hi_pdu* req, int minlen)
96 {
97   int n = req->ap - req->m;
98 
99   /* Allocate next io->cur_pdu here
100    * a. because we already hold io->qel.mut (saves lock, otherwise hi_read() will alloc)
101    * b. because we might need to copy tail of previous req to it */
102 
103   io->cur_pdu = hi_pdu_alloc(hit, "cur_pdu-ckm");
104   if (!io->cur_pdu) {  hi_dump(hit->shf); NEVERNEVER("*** out of pdus in bad place %d", n); }
105   io->cur_pdu->fe = io;
106   io->cur_pdu->need = minlen;
107   ++io->n_pdu_in;   /* stats */
108 
109   D("Chkmore(%x) mn=%d n=%d req_%p->need=%d", io->fd, minlen, n, req, req->need);
110   ASSERT(minlen > 0);  /* If this is ever zero it will prevent hi_poll() from producing. */
111   if (n > req->need) {
112     memcpy(io->cur_pdu->ap, req->m + req->need, n - req->need);
113     io->cur_pdu->ap += n - req->need;
114     req->ap = req->m + req->need;      /* final length of decoded PDU */
115   }
116 }
117 
118 /*() Add a PDU to the reqs associated with the io object.
119  * Often moving PDU to reqs means it should stop being cur_pdu.
120  * If minlen (protocol dependent PDU minimum length) is passed,
121  * hi_checkmore() processing is triggered and cur_pdu dealt with.
122  * Clearing cur_pdu is important to enable the hi_in_out() to
123  * admit new worker thread to perform read work.
124  * locking:: takes io->qel.mut
125  * see also:: hi_del_from_reqs() */
126 
127 /* Called by:  http_decode, stomp_decode, stomp_frame_err, test_ping */
hi_add_to_reqs(struct hi_thr * hit,struct hi_io * io,struct hi_pdu * req,int minlen)128 void hi_add_to_reqs(struct hi_thr* hit, struct hi_io* io, struct hi_pdu* req, int minlen)
129 {
130   LOCK(io->qel.mut, "add_to_reqs");
131   D("LOCK io(%x)->qel.thr=%lx", io->fd, (long)io->qel.mut.thr);
132   if (minlen) {
133     ASSERTOPP(io->cur_pdu, ==, req);
134     ASSERT(io->reading);
135     io->reading = 0;  /* reading was set in hi_in_out() */
136     D("Out of reading(%x) n_thr=%d (reset cur_pdu)", io->fd, io->n_thr);
137     hi_checkmore(hit, io, req, minlen);
138   }
139   /* --io->n_thr; ASSERT(io->n_thr >= 0);  N.B. dec n_thr for read is handled in hi_read() */
140   ASSERTOPP(req->fe, ==, io);
141   req->n = io->reqs;
142   io->reqs = req;
143   D("UNLOCK io(%x)->qel.thr=%lx req_%p", io->fd, (long)io->qel.mut.thr, req);
144   UNLOCK(io->qel.mut, "add_to_reqs");
145 }
146 
147 /*() Read from the network, with all the repercussions.
148  * When entering, n_thr will have been incremented for the read count.
149  * No matter how many iterations may happen (and PDUs be processed),
150  * the n_thr count will be decremented by one in end of this
151  * function. There is no need to manipulate n_thr in decoders, but if
152  * decoder also engages in write, this is handled in hi_send0(). */
153 
154 /* Called by:  hi_in_out */
hi_read(struct hi_thr * hit,struct hi_io * io)155 int hi_read(struct hi_thr* hit, struct hi_io* io)
156 {
157   int ret,err;
158   struct hi_pdu* pdu;
159   while (1) {  /* eagerly read until we exhaust the read (c.f. edge triggered epoll) */
160     ASSERT(io->reading);
161     pdu = io->cur_pdu;
162     D("loop(%x)->cur_pdu=%p ssl_%p", io->fd, pdu, io->ssl);
163     ASSERT(pdu);  /* Exists either through hi_shuff_init() or through hi_check_more() */
164   retry:
165     D("read(%x) have=%d need=%d buf_avail=%d", io->fd, (int)(pdu->ap-pdu->m), pdu->need, (int)(pdu->lim-pdu->ap));
166     ASSERT(io->reading);
167 #ifdef USE_OPENSSL
168     if (io->ssl) {
169       ret = SSL_read(io->ssl, pdu->ap, pdu->lim - pdu->ap);
170       switch (err = SSL_get_error(io->ssl, ret)) {
171       case SSL_ERROR_NONE: break; /* Something read case */
172       case SSL_ERROR_WANT_READ:
173 	D("SSL EAGAIN READ fd(%x)", io->fd); /* Comparable to EAGAIN. Should we remember which? */
174 	//h->ioflags |= IO_MISSPOLL; /** Need more data after poll() **/
175 	zx_report_openssl_err("SSL again read"); /* *** do we need this to clear error stack? */
176 	goto eagain_out;
177       case SSL_ERROR_WANT_WRITE:
178 	D("SSL EAGAIN WRITE fd(%x)", io->fd); /* Comparable to EAGAIN. Should we remember which? */
179 	zx_report_openssl_err("SSL again write"); /* *** do we need this to clear error stack? */
180 	goto eagain_out;
181       case SSL_ERROR_ZERO_RETURN: D("SSL EOF fd(%x)", io->fd);	goto conn_close;
182       default:
183 	ERR("SSL_read(%x) ret=%d err=%d", io->fd, ret, err);
184 	zx_report_openssl_err("SSL_read");
185 	if (!io->n_read) {
186 	  ERR("SSL Conn. failed(%x) ret=%d err=%d errno=%d %s", io->fd, ret, err, errno, STRERROR(errno));
187 	  if (errno != ECONNREFUSED)
188 	    write(io->fd, SSL_ENCRYPTED_HINT, sizeof(SSL_ENCRYPTED_HINT)-1);
189 	}
190 	/* N.B. A socket layer EOF not detected, but would be an I/O error from SSL perspective. */
191 	goto conn_close;
192       }
193     } else
194 #endif
195     {
196       ret = read(io->fd&0x7fffffff, pdu->ap, pdu->lim - pdu->ap); /* *** vs. need */
197       switch (ret) {
198       case 0:
199 	/* *** any provision to process still pending PDUs? */
200 	D("EOF fd(%x)", io->fd);
201 	goto conn_close;
202       case -1:
203 	switch (errno) {
204 	case EINTR:  D("EINTR fd(%x)", io->fd); goto retry;
205 	case EAGAIN: D("EAGAIN READ fd(%x)", io->fd); goto eagain_out;  /* read(2) exhausted (c.f. edge triggered epoll) */
206 	default:
207 	  ERR("read(%x) failed: %d %s (closing connection)", io->fd, errno, STRERROR(errno));
208 	  goto conn_close;
209 	}
210       }
211     }
212     /* something was read, invoke PDU parsing layer */
213     D("got(%x)=%d, proto=%d cur_pdu(%p) need=%d", io->fd, ret, io->qel.proto, pdu, pdu->need);
214     HEXDUMP("got:", pdu->ap, pdu->ap + ret, /*16*/ 256);
215     pdu->ap += ret;
216     io->n_read += ret;
217     while (pdu
218 	   && pdu->need   /* no further I/O desired */
219 	   && pdu->need <= (pdu->ap - pdu->m)) {
220       D("decode_loop io(%x)->cur_pdu=%p need=%d", io->fd, pdu, pdu?pdu->need:-99);
221       ASSERTOPP(pdu, ==, io->cur_pdu);
222       switch (io->qel.proto) {
223 	/* DISPATCH: Following decoders MUST either
224 	 * a. drop connection in which case any rescheduling is moot
225 	 * b. consume cur_pdu (set it to 0 or new PDU). This will cause
226 	 *    read to be consumed until exhausted, and later trigger new todo
227 	 *    when there is more data to be had, see hi_poll()
228 	 * c. take some other action such as scheduling PDU to todo. Typically
229 	 *    the req->need is zero when I/O is not expected.	   */
230 #ifdef ENA_S5066
231       case HIPROTO_SIS:   if (sis_decode(hit, io))   goto conn_close;  break; /* *** use ret */
232       case HIPROTO_DTS:   if (dts_decode(hit, io))   goto conn_close;  break; /* *** use ret */
233 #endif
234       case HIPROTO_HTTP:  if (http_decode(hit, io))  goto conn_close;  break; /* *** use ret */
235 	//case HIPROTO_STOMP: if (stomp_decode(hit, io)) goto conn_close;  break;
236       case HIPROTO_STOMP: ret = stomp_decode(hit, io);  break;
237       case HIPROTO_TEST_PING: test_ping(hit, io);  break; /* *** use ret */
238       case HIPROTO_SMTP: /* *** use ret */
239 	if (io->qel.kind == HI_TCP_C) {
240 	  if (smtp_decode_resp(hit, io))  goto out;
241 	} else {
242 	  if (smtp_decode_req(hit, io))   goto out;
243 	}
244 	break;
245 	/* *** add here a project dependent include? Or a macro. */
246       default: NEVERNEVER("unknown proto(%x)", io->qel.proto);
247       }
248 
249       /* Take another iteration. io->reading may have already been set (incompletely decoded
250        * PDU) or it may have been cleared (completely decoded PDU). Additional
251        * complication is that it may have been cleared during PDU processing, but
252        * then set again bu a different thread, such as second reader. *** */
253 
254       switch (ret) {
255       case HI_NOERR: /* 0: In this case io->reading has been cleared at hi_add_req() due to
256 		      * completely decoded PDU. It may have been acquired by other thread. */
257 	LOCK(io->qel.mut, "reset-reading");
258 	D("LOCK io(%x)->qel.thr=%lx n_c/t=%d/%d", io->fd, (long)io->qel.mut.thr, io->n_close, io->n_thr);
259 	if (io->reading) {
260 	  D("Somebody else already reading n_thr=%d", io->n_thr);
261 	  --io->n_thr;              /* Remove read count. */
262 	  ASSERT(io->n_thr >= 0);
263 	  ASSERT(hit->cur_io == io);
264 	  ASSERT(hit->cur_n_close == io->n_close);
265 	  hit->cur_io = 0;
266 	  D("UNLOCK io(%x)->qel.thr=%lx", io->fd, (long)io->qel.mut.thr);
267 	  UNLOCK(io->qel.mut, "reset-reading-abort");
268 	  return 0;
269 	}
270 	io->reading = 1;  /* reacquire */
271 	pdu = io->cur_pdu;
272 	D("reacquired reading(%x) n_thr=%d", io->fd, io->n_thr);
273 	D("UNLOCK io(%x)->qel.thr=%lx", io->fd, (long)io->qel.mut.thr);
274 	UNLOCK(io->qel.mut, "reset-reading");
275 	break;
276       case HI_CONN_CLOSE:         goto conn_close_not_reading;
277       case HI_NEED_MORE: /* 2: Incomplete decode, we still hold io->reading. */
278 	ASSERT(io->reading);
279 	break;
280       }
281     }
282   }
283 
284  eagain_out:
285   /* A special problem with EAGAIN: read(2) is not guaranteed to arm edge triggered epoll(2)
286    * unless at least one EAGAIN read has happened. The problem is that as we are still
287    * in io->reading, if after this EAGAIN another thread polls and consumes from todo, it
288    * will not be able to read due to io->reading even though poll told it to read. After
289    * missing the opportunity, the next poll will not report fd anymore because no read has
290    * happened since previous report. Ouch!
291    * Solution attempt: if read was polled, but could not be served due to io->reading.
292    * the PDU is added back to the todo queue. This may cause the other thread to spin
293    * for a while, but at least things will move on eventually. */
294  out:
295   LOCK(io->qel.mut, "clear-reading");
296   D("RD-OUT: LOCK & UNLOCK io(%x)->qel.thr=%lx", io->fd, (long)io->qel.mut.thr);
297   ASSERT(io->reading);
298   io->reading = 0;
299   --io->n_thr;              /* Remove read count. */
300   ASSERT(hit->cur_io == io);
301   ASSERT(hit->cur_n_close == io->n_close);
302   hit->cur_io = 0;
303   D("out of reading(%x) n_thr=%d", io->fd, io->n_thr);
304   ASSERT(io->n_thr >= 0);
305   UNLOCK(io->qel.mut, "clear-reading");
306   return 0;
307 
308  conn_close:
309   /* Connection close due to EOF, etc. We are still in io->reading. */
310   LOCK(io->qel.mut, "clear-reading-close");
311   D("RD-CLO: LOCK & UNLOCK io(%x)->qel.thr=%lx", io->fd, (long)io->qel.mut.thr);
312   ASSERT(io->reading);
313   io->reading = 0;
314   --io->n_thr;                 /* Remove read count. */
315   ASSERT(io->n_thr >= 0);
316   D("out of reading(%x) n_thr=%d (close)", io->fd, io->n_thr);
317   UNLOCK(io->qel.mut, "clear-reading-close");
318   hi_close(hit, io, "hi_read");  /* will clear hit->cur_io */
319   return 1;
320 
321  conn_close_not_reading:
322   /* Connection close after PDU has been read, e.g. protocol level DISCONNECT.
323    * We are no longer in io->reading (but other thread might be).
324    * io->n_thr still needs to be decremented. */
325   LOCK(io->qel.mut, "clear-reading-close");
326   D("RD-NRC: LOCK & UNLOCK io(%x)->qel.thr=%lx", io->fd, (long)io->qel.mut.thr);
327   /*ASSERT(!io->reading); This can not be asserted as other thread might have gone to read. */
328   --io->n_thr;              /* Remove read count. */
329   D("not_reading-close(%x) n_thr=%d r/w=%d/%d ev=%d", io->fd, io->n_thr, io->reading, io->writing, io->events);
330   ASSERT(io->n_thr >= 0);
331   UNLOCK(io->qel.mut, "not_reading-close");
332   hi_close(hit, io, "hi_read-no_reading");  /* will clear hit->cur_io */
333   return 1;
334 }
335 
336 /* EOF  --  hiread.c */
337 
338 #if 0
339 
340 N.B. Be careful to link against same OPenSSL libraries as headers. Otherwise
341 reference count errors may appers (at least with 1.0.1 headers agains 1.0.0 libs).
342 
343 Mystery SSL error (in the end caused by errors left in SSL stack by private key reading)
344 
345 On LinuxMint 12 / Ubuntu / Debian
346 tb784d6e0    hiios.c:201 hi_new_shuffler  	zx d OpenSSL header-version(1000005f) lib-version(OpenSSL 1.0.0e 6 Sep 2011) cflags(compiler: cc -fPIC -DOPENSSL_PIC -DZLIB -DOPENSSL_THREADS -D_REENTRANT -DDSO_DLFCN -DHAVE_DLFCN_H -DL_ENDIAN -DTERMIO -O3 -Wa,--noexecstack -g -Wall) builton(built on: Thu Feb  9 00:57:05 UTC 2012) platform(platform: debian-i386) dir(OPENSSLDIR: "/usr/lib/ssl")
347 
348 // On first connection, after several successful reads and writes
349 
350 tb784d6e0   hiread.c:164 hi_read          	zx d read(9) have=0 need=6 buf_avail=3072
351 tb784d6e0   hiread.c:182 hi_read          	zx E SSL_read ret=-1 err=1
352 tb784d6e0    zxsig.c:404 zx_report_openssl_err 	zx E SSL_read: OpenSSL error(218529960) error:0D0680A8:asn1 encoding routines:ASN1_CHECK_TLEN:wrong tag (tasn_dec.c:1319): ? 0
353 tb784d6e0    zxsig.c:404 zx_report_openssl_err 	zx E SSL_read: OpenSSL error(218546234) error:0D06C03A:asn1 encoding routines:ASN1_D2I_EX_PRIMITIVE:nested asn1 error (tasn_dec.c:831): ? 0
354 tb784d6e0    zxsig.c:404 zx_report_openssl_err 	zx E SSL_read: OpenSSL error(218640442) error:0D08303A:asn1 encoding routines:ASN1_TEMPLATE_NOEXP_D2I:nested asn1 error (tasn_dec.c:751): Field=n, Type=RSA 3
355 tb784d6e0    zxsig.c:404 zx_report_openssl_err 	zx E SSL_read: OpenSSL error(67710980) error:04093004:rsa routines:OLD_RSA_PRIV_DECODE:RSA lib (rsa_ameth.c:115): ? 0
356 
357 // On subsequent connections
358 
359 tb784d6e0   hiread.c:164 hi_read          	zx d read(a) have=0 need=6 buf_avail=3072
360 tb784d6e0    hiios.c:116 zxbus_info_cb    	zx d SSL3 alert write:fatal:protocol version
361 
362 tb784d6e0   hiread.c:182 hi_read          	zx E SSL_read ret=-1 err=1
363 tb784d6e0    zxsig.c:404 zx_report_openssl_err 	zx E SSL_read: OpenSSL error(336130315) error:1408F10B:SSL routines:SSL3_GET_RECORD:wrong version number (s3_pkt.c:339): ? 0
364 
365 // Someone one the net has similar problem with postfix mail
366 
367 http://old.nabble.com/error%3A1408F10B%3ASSL-routines%3ASSL3_GET_RECORD%3Awrong-version-number-td33867821.html
368 
369 Known issue (20120518)
370 
371 http://cvs.openssl.org/chngview?cn=22565
372 
373 openssl/ssl/s3_pkt.c 1.72.2.7.2.15 -> 1.72.2.7.2.16
374 
375 --- s3_pkt.c	2012/04/17 13:20:19	1.72.2.7.2.15
376 +++ s3_pkt.c	2012/05/11 13:32:26	1.72.2.7.2.16
377 @@ -744,6 +744,7 @@
378  	 * bytes and record version number > TLS 1.0
379  	 */
380  	if (s->state == SSL3_ST_CW_CLNT_HELLO_B
381 +				&& !s->renegotiate
382  				&& TLS1_get_version(s) > TLS1_VERSION)
383  		*(p++) = 0x1;
384  	else
385 
386 
387 #endif
388