1 /* hiios.c  -  Hiquu I/O Engine I/O shuffler
2  * Copyright (c) 2006,2012 Sampo Kellomaki (sampo@iki.fi), All Rights Reserved.
3  * This is confidential unpublished proprietary source code of the author.
4  * NO WARRANTY, not even implied warranties. Contains trade secrets.
5  * Distribution prohibited unless authorized in writing. See file COPYING.
6  * Special grant: hiios.c may be used with zxid open source project under
7  * same licensing terms as zxid itself.
8  * $Id$
9  *
10  * 15.4.2006, created over Easter holiday --Sampo
11  * 16.8.2012, modified license grant to allow use with ZXID.org --Sampo
12  * 6.9.2012,  added support for TLS and SSL --Sampo
13  * 17.9.2012, factored init, todo, and net code to their own files --Sampo
14  *
15  * See http://pl.atyp.us/content/tech/servers.html for inspiration on threading strategy.
16  *
17  *   MANY ELEMENTS IN QUEUE            ONE ELEMENT IN Q   EMPTY QUEUE
18  *   consume             produce       consume  produce   consume  produce
19  *    |                   |             | ,-------'         |        |
20  *    V                   V             V V                 V        V
21  *   qel.n --> qel.n --> qel.n --> 0   qel.n --> 0          0        0
22  *
23  * *** see if edge triggered epoll has some special consideration for accept(2).
24  */
25 
26 #include "platform.h"
27 
28 #include <pthread.h>
29 #include <memory.h>
30 #include <stdlib.h>
31 //#include <unistd.h>
32 #include <fcntl.h>
33 #include <sys/types.h>
34 #include <sys/socket.h>
35 #include <errno.h>
36 #include <string.h>
37 
38 #include <zx/zxid.h>
39 #include "akbox.h"
40 #include "hiproto.h"
41 #include "hiios.h"
42 #include "errmac.h"
43 
44 extern int errmac_debug;
45 #ifdef MUTEX_DEBUG
46 extern pthread_mutexattr_t MUTEXATTR_DECL;
47 #endif
48 
49 /*() Close an I/O object (in multiple stages)
50  * The close may be called in response to I/O errors or for controlled
51  * disconnect. At the time of first calling close, any number of
52  * threads (see io->n_thr) may be able to access the io object and the
53  * io object may still be in todo queue or it may be returned by poll.
54  * We need to wait for all these possibilities to flush out.
55  *
56  * We start by deregistering the io from poll and half closing it
57  * so that no more reads are possible (write to send e.g. TLS disconnect
58  * is still possible). As the different threads encounter the io
59  * object unusable, they will decrement io->n_thr and call hi_close().
60  *
61  * It is important to not fully close(2) the socket as doing so
62  * would allow an accept(2) that would almost certainly use the
63  * same fd number. This would cause the still pending threads
64  * to act on the new connection, which would be a serious error.
65  *
66  * Once the io->n_thr reaches 0, the only possible source of activity
67  * for the fd is that it is returned by poll. Thus we start an end game,
68  * indicated by io->n_thr == HI_IO_N_THR_END_GAME,
69  * where we put to todo queue a poll and then the io object. This
70  * way, if the poll were about to return the io, then it is forced
71  * to do so. After poll, the io is consumed from todo the one last
72  * time and we can safely close(2) the fd.
73  *
74  * By the time we are really ready to close the io, all associated PDUs
75  * have been freed by the respective threads (usually through write
76  * of response freeing both response and request).
77  *
78  * locking:: will take io->qel.mut */
79 
80 /* Called by:  hi_in_out x2, hi_read x2, hi_todo_consume, hi_write */
81 void hi_close(struct hi_thr* hit, struct hi_io* io, const char* lk)
82 {
83   struct hi_pdu* pdu;
84   int fd = io->fd;
85   DD("%s: closing(%x) n_c/t=%d/%d", lk, fd, io->n_close, io->n_thr);
86   LOCK(io->qel.mut, "hi_close");
87   D("LOCK io(%x)->qel.thr=%lx n_c/t=%d/%d", fd, (long)io->qel.mut.thr, io->n_close, io->n_thr);
88 
89   if (fd&0x80000000) {
90     D("%s: 2nd close(%x) n_c/t=%d/%d", lk, fd, io->n_close, io->n_thr);
91   } else {
92     INFO("%s: 1st close(%x) n_c/t=%d/%d", lk, fd, io->n_close, io->n_thr);
93     if (shutdown(fd, SHUT_RD))
94       ERR("%s: shutdown(%x) %d %s", lk, fd, errno, STRERROR(errno));
95     hi_del_fd(hit, fd);    /* stop poll from returning this fd */
96     io->fd |= 0x80000000;  /* mark as closed */
97     ASSERTOPI(io->n_thr, >, 0);
98     --io->n_thr;  /* Will not be returned by poll any more, thus remove poll "virtual thread" */
99   }
100 
101   ASSERT(io->qel.intodo != HI_INTODO_SHF_FREE);
102   ASSERT(hit->cur_io == io);
103   if (hit->cur_n_close != io->n_close) {
104     ERR("%s: already closed(%x) cur_n_close=%d != n_close=%d",lk,fd,hit->cur_n_close,io->n_close);
105     hit->cur_io = 0;
106     D("UNLOCK io(%x)->qel.thr=%lx", fd, (long)io->qel.mut.thr);
107     UNLOCK(io->qel.mut, "hi_close-already");
108     return;
109   }
110 
111   /* N.B. n_thr manipulations should be done before calling hi_close() */
112   if (io->n_thr > 0) {
113     D("%s: close-wait(%x) n_c/t=%d/%d intodo=%x", lk, fd, io->n_close, io->n_thr, io->qel.intodo);
114     hit->cur_io = 0;
115     D("UNLOCK io(%x)->qel.thr=%lx", fd, (long)io->qel.mut.thr);
116     UNLOCK(io->qel.mut, "hi_close-wait");
117     return;
118   }
119   if (io->n_thr == 0) {
120     io->n_thr = HI_IO_N_THR_END_POLL;
121     D("%s: close-poll(%x) n_c/t=%d/%d intodo=%x", lk, fd, io->n_close, io->n_thr, io->qel.intodo);
122     hit->cur_io = 0;
123     D("UNLOCK io(%x)->qel.thr=%lx", fd, (long)io->qel.mut.thr);
124     UNLOCK(io->qel.mut, "hi_close-poll");
125     hi_todo_produce(hit, &io->qel, "close-poll", 0); /* Trigger 1st poll, see hi_todo_consume() */
126     return;
127   }
128   if (io->n_thr != HI_IO_N_THR_END_GAME) {
129     ERR("%s: close-n_thr(%x) n_c/t=%d/%d intodo=%x", lk,fd,io->n_close,io->n_thr,io->qel.intodo);
130     ASSERTOPI(io->n_thr, ==, HI_IO_N_THR_END_GAME);
131     hit->cur_io = 0;
132     D("UNLOCK io(%x)->qel.thr=%lx", fd, (long)io->qel.mut.thr);
133     UNLOCK(io->qel.mut, "hi_close-n_thr");
134     return;
135   }
136 
137   /* Now we are ready to really close */
138 
139   D("%s: close-final(%x) n_c/t=%d/%d", lk, io->fd, io->n_close, io->n_thr);
140 
141   for (pdu = io->reqs; pdu; pdu = pdu->n)
142     hi_free_req(hit, pdu, "close-reqs ");
143   io->reqs = 0;
144   for (pdu = io->pending; pdu; pdu = pdu->n)
145     hi_free_req(hit, pdu, "close-pend ");
146   io->pending = 0;
147 
148   if (io->cur_pdu) {
149     hi_free_req(hit, io->cur_pdu, "close-cur ");
150     io->cur_pdu = hi_pdu_alloc(hit, "cur_pdu-clo");  /* *** Could we recycle the PDU without freeing? */
151     io->cur_pdu->fe = io;
152   }
153 #ifdef ENA_S5066
154   void sis_clean(struct hi_io* io);
155   sis_clean(io);
156 #endif
157 
158   /* Clear the association with entity as late as possible so ACKs may
159    * get a chance of being processed and written. */
160   if (io->ent) {
161     if (io->ent->io == io) {
162       io->ent->io = 0;
163       /*INFO("Dissociate ent_%p (%s) from io(%x)", io->ent, io->ent->eid, io->fd);*/
164       INFO("Dissociate ent_%p from io(%x)", io->ent, io->fd);
165     } else {
166       WARN("io(%x)->ent and ent->io(%x) are diff", io->fd, io->ent->io?io->ent->io->fd:-1);
167     }
168     io->ent = 0;
169   } else {
170     ERR("io(%x) has no entity associated", io->fd);
171   }
172 
173 #ifdef USE_OPENSSL
174   if (io->ssl) {
175     SSL_shutdown(io->ssl);
176     SSL_free(io->ssl);
177     io->ssl = 0;
178   }
179 #endif
180   ASSERTOPI(io->qel.intodo, ==, HI_INTODO_IOINUSE); /* HI_INTODO_INTODO should not be possible anymore. */
181   io->qel.intodo = HI_INTODO_SHF_FREE;
182   io->n_thr = 0;
183   ++io->n_close;
184   hit->cur_io = 0;
185   close(io->fd & 0x7ffffff); /* Now some other thread may reuse the slot by accept()ing same fd */
186   INFO("%s: CLOSED(%x) n_close=%d", lk, io->fd, io->n_close);
187 
188   /* Must let go of the lock only after close so no read can creep in. */
189   D("UNLOCK io(%x)->qel.thr=%lx", fd, (long)io->qel.mut.thr);
190   UNLOCK(io->qel.mut, "hi_close");
191 }
192 
193 /* ---------- shuffler ---------- */
194 
195 /*() For a fd that was consumed from todo, deal with potential reads and writes */
196 
197 /* Called by:  hi_shuffle */
198 void hi_in_out(struct hi_thr* hit, struct hi_io* io)
199 {
200   int reading;
201 
202   LOCK(io->qel.mut, "in_out");
203   D("LOCK io(%x)->qel.thr=%lx r/w=%d/%d ev=%x", io->fd, (long)io->qel.mut.thr, io->reading, io->writing, io->events);
204   if (io->events & (EPOLLHUP | EPOLLERR)) {
205     D("HUP or ERR on fd=%x events=0x%x", io->fd, io->events);
206   close:
207     io->n_thr -= 2;                   /* Remove both counts (write and read) */
208     ASSERT(io->n_thr >= 0);
209     UNLOCK(io->qel.mut, "in_out-hup");
210     hi_close(hit, io, "hi_in_out-hup");
211     return;
212   }
213 
214   /* We must ensure that only one thread is trying to write. The poll may
215    * still report the io as writable after a thread has taken the
216    * task, in that case we want the second thread to skip write and
217    * go process the read. */
218   if (io->events & EPOLLOUT && !io->writing) {
219     D("OUT fd=%x n_iov=%d n_to_write=%d writing", io->fd, io->n_iov, io->n_to_write);
220 
221     /* Although in_write is checked in hi_write() as well, take the opportunity
222      * to check it right here while we already hold the lock. */
223     if (!io->in_write)  /* Need to prepare new iov? */
224       hi_make_iov_nolock(io);
225     if (io->in_write) {
226       io->writing = 1;
227       D("UNLOCK io(%x)->qel.thr=%lx", io->fd, (long)io->qel.mut.thr);
228       UNLOCK(io->qel.mut, "check-writing-enter");
229 
230       if (hi_write(hit, io))  { /* will clear io->writing */
231 	LOCK(io->qel.mut, "n_thr-dec2");
232 	D("IN_OUT: LOCK & UNLOCK io(%x)->qel.thr=%lx closed", io->fd, (long)io->qel.mut.thr);
233 	--io->n_thr;            /* Remove read count, write count already removed by hi_write() */
234 	ASSERT(io->n_thr >= 0);
235 	ASSERT(hit->cur_io == io);
236 	ASSERT(hit->cur_n_close == io->n_close);
237 	UNLOCK(io->qel.mut, "n_thr-dec2");
238 	hi_close(hit, io, "write-shortcircuit-close");  /* Close again, now n_thr was reduced */
239 	return; /* Write caused close, read will be futile */
240       } else {
241 	LOCK(io->qel.mut, "check-reading");
242 	D("LOCK io(%x)->qel.thr=%lx", io->fd, (long)io->qel.mut.thr);
243       }
244     } else {
245       if (io->fd & 0x80000000) {
246 	/* Seems it was already a closed one, but due to no write, no opportunity for error. */
247 	D("nothing to write and closed io(%x)->n_thr=%d", io->fd, io->n_thr);
248 	goto close;
249       }
250       --io->n_thr;              /* Remove write count as no write happened. */
251       D("no inwrite io(%x)->n_thr=%d", io->fd, io->n_thr);
252     }
253   } else {
254     --io->n_thr;              /* Remove write count as no write happened. */
255     D("no EPOLLOUT io(%x)->n_thr=%d", io->fd, io->n_thr);
256   }
257   ASSERT(io->n_thr > 0 || io->n_thr == HI_IO_N_THR_END_GAME);  /* Read cnt should still be there */
258   io->events &= ~EPOLLOUT;  /* Clear poll flag in case we get read rescheduling */
259 
260   if (io->events & EPOLLIN) {
261     /* A special problem with EAGAIN: read(2) is not guaranteed to arm edge triggered epoll(2)
262      * unless at least one EAGAIN read has happened. The problem is that as we are still
263      * in io->reading, if after this EAGAIN another thread polls and consumes from todo, it
264      * will not be able to read due to io->reading even though poll told it to read. After
265      * missing the opportunity, the next poll will not report fd anymore because no read has
266      * happened since previous report. Ouch!
267      * Solution attempt: if read was polled, but could not be served due to io->reading.
268      * the PDU is added back to the todo queue. This may cause the other thread to spin
269      * for a while, but at least things will move on eventually. */
270     if (!io->reading) {
271       D("IN fd=%x cur_pdu=%p need=%d", io->fd, io->cur_pdu, io->cur_pdu->need);
272       /* Poll says work is possible: sched wk for io if not under wk yet, or cur_pdu needs wk.
273        * The inverse is also important: if io->cur_pdu is set, but pdu->need is not, then someone
274        * is alredy working on decoding the cur_pdu and we should not interfere. */
275       reading = io->reading = io->cur_pdu->need; /* only place where io->reading is set */
276       D("UNLOCK io(%x)->qel.thr=%lx", io->fd, (long)io->qel.mut.thr);
277       UNLOCK(io->qel.mut, "check-reading");
278       if (reading) {
279 	hi_read(hit, io);       /* io->n_thr and hit->cur_io have already been updated */
280 	ASSERT(!hit->cur_io);
281       } else {
282 	LOCK(io->qel.mut, "n_thr-dec3");
283 	D("IN_OUT: LOCK & UNLOCK io(%x)->qel.thr=%lx", io->fd, (long)io->qel.mut.thr);
284 	--io->n_thr;            /* Remove read count as no read happened. */
285 	ASSERT(io->n_thr >= 0);
286 	ASSERT(hit->cur_io == io);
287 	ASSERT(hit->cur_n_close == io->n_close);
288 	hit->cur_io = 0;
289 	UNLOCK(io->qel.mut, "n_thr-dec3");
290       }
291     } else {
292       ASSERT(io->n_thr > 0);
293       ASSERT(hit->cur_io == io);
294       ASSERT(hit->cur_n_close == io->n_close);
295       /*--io->n_thr;     * Do not decrement. We need to keep n_thr until we are in todo queue. */
296       hit->cur_io = 0;
297       D("UNLOCK io(%x)->qel.thr=%lx", io->fd, (long)io->qel.mut.thr);
298       UNLOCK(io->qel.mut, "n_thr-dec4");
299       D("resched(%x) to avoid miss poll read n_thr=%d", io->fd, io->n_thr);
300       hi_todo_produce(hit, &io->qel, "reread", 0);  /* try again so read poll is not lost */
301     }
302   } else {
303     --io->n_thr;              /* Remove read count as no read happened. */
304     ASSERT(io->n_thr >= 0);
305     ASSERT(hit->cur_io == io);
306     ASSERT(hit->cur_n_close == io->n_close);
307     hit->cur_io = 0;
308     D("UNLOCK io(%x)->qel.thr=%lx", io->fd, (long)io->qel.mut.thr);
309     UNLOCK(io->qel.mut, "n_thr-dec5");
310   }
311 }
312 
313 /*() Main I/O shuffling loop. Never returns. Main loop of most (all?) threads. */
314 
315 /* Called by:  thread_loop, zxbusd_main */
316 void hi_shuffle(struct hi_thr* hit, struct hiios* shf)
317 {
318   struct hi_qel* qe;
319   hit->shf = shf;
320   LOCK(shf->todo_mut, "add-thread");
321   hit->n = shf->threads;
322   shf->threads = hit;
323   UNLOCK(shf->todo_mut, "add-thread");
324   INFO("Start shuffling hit(%p) shf(%p)", hit, shf);
325   hi_sanity_shf(255, shf);
326   while (1) {
327     HI_SANITY(hit->shf, hit);
328     qe = hi_todo_consume(hit);  /* Wakes up the heard to receive work. */
329     switch (qe->kind) {
330     case HI_POLLT:    hi_poll(hit); break;
331     case HI_LISTENT:  hi_accept(hit, (struct hi_io*)qe); break;
332     case HI_HALF_ACCEPT: hi_accept_book(hit, (struct hi_io*)qe, ((struct hi_io*)qe)->fd);
333     case HI_TCP_C:
334     case HI_TCP_S:    hi_in_out(hit, (struct hi_io*)qe); break;
335     case HI_PDU_DIST: stomp_msg_deliver(hit, (struct hi_pdu*)qe); break;
336 #ifdef HAVE_NET_SNMP
337     case HI_SNMP:     if (snmp_port) processSNMP(); break; /* *** needs more thought */
338 #endif
339     default: NEVER("unknown qel->kind 0x%x", qe->kind);
340     }
341   }
342 }
343 
344 /* EOF  --  hiios.c */
345