1 /* hitodo.c  -  Hiquu I/O Engine todo queue management
2  * Copyright (c) 2006,2012 Sampo Kellomaki (sampo@iki.fi), All Rights Reserved.
3  * This is confidential unpublished proprietary source code of the author.
4  * NO WARRANTY, not even implied warranties. Contains trade secrets.
5  * Distribution prohibited unless authorized in writing. See file COPYING.
6  * Special grant: hiios.c may be used with zxid open source project under
7  * same licensing terms as zxid itself.
8  * $Id$
9  *
10  * 15.4.2006, created over Easter holiday --Sampo
11  * 16.8.2012, modified license grant to allow use with ZXID.org --Sampo
12  * 6.9.2012,  added support for TLS and SSL --Sampo
13  * 17.9.2012, factored todo code to its own file --Sampo
14  *
15  * See http://pl.atyp.us/content/tech/servers.html for inspiration on threading strategy.
16  *
17  *   MANY ELEMENTS IN QUEUE            ONE ELEMENT IN Q   EMPTY QUEUE
18  *   consume             produce       consume  produce   consume  produce
19  *    |                   |             | ,-------'         |        |
20  *    V                   V             V V                 V        V
21  *   qel.n --> qel.n --> qel.n --> 0   qel.n --> 0          0        0
22  *
23  ****
24  * accept() blocks (after accept returned EAGAIN) - see if this is a blocking socket
25  * see if edge triggered epoll has some special consideration for accept(2).
26  */
27 
28 #include "platform.h"
29 
30 #include <pthread.h>
31 #include <memory.h>
32 #include <stdlib.h>
33 //#include <unistd.h>
34 #include <fcntl.h>
35 #include <sys/types.h>
36 #include <sys/socket.h>
37 #include <errno.h>
38 #include <string.h>
39 
40 #include <zx/zxid.h>
41 #include "akbox.h"
42 #include "hiproto.h"
43 #include "hiios.h"
44 #include "errmac.h"
45 
46 extern int errmac_debug;
47 
48 const char* qel_kind[] = {
49   "OFF0",
50   "poll1",
51   "listen2",
52   "half_accept3",
53   "tcp_s4",
54   "tcp_c5",
55   "snmp6",
56   "pdu7",
57   0
58 };
59 
60 #define QEL_KIND(x) (((x) >= 0 && (x) < sizeof(qel_kind)/sizeof(char*))?qel_kind[(x)]:"???")
61 
62 /* -------- todo_queue management, waking up threads to consume work (io, pdu) -------- */
63 
64 /*(-) Simple mechanics of deque operation against shf->todo_consumer */
65 
66 /* Called by:  hi_todo_consume */
hi_todo_consume_queue_inlock(struct hiios * shf)67 static struct hi_qel* hi_todo_consume_queue_inlock(struct hiios* shf)
68 {
69   struct hi_qel* qe = shf->todo_consume;
70   shf->todo_consume = qe->n;
71   if (!qe->n)
72     shf->todo_produce = 0;
73   qe->n = 0;
74   qe->intodo = qe->kind == HI_PDU_DIST ? HI_INTODO_PDUINUSE : HI_INTODO_IOINUSE;
75   --shf->n_todo;
76   return qe;
77 }
78 
79 /*(-) Simple mechanics of enque operation against shf->todo_producer */
80 
81 /* Called by:  hi_todo_consume, hi_todo_produce */
hi_todo_produce_queue_inlock(struct hiios * shf,struct hi_qel * qe)82 static void hi_todo_produce_queue_inlock(struct hiios* shf, struct hi_qel* qe)
83 {
84   if (shf->todo_produce)
85     shf->todo_produce->n = qe;
86   else
87     shf->todo_consume = qe;
88   shf->todo_produce = qe;
89   qe->n = 0;
90   qe->intodo = HI_INTODO_INTODO;
91   ++shf->n_todo;
92 }
93 
94 /*(i) Consume from todo queue. If nothing is available,
95  * block until there is work to do. If todo queue is
96  * empty, see if we should poll again. This is the main
97  * mechanism by which worker threads get something to do. */
98 
99 /* Called by:  hi_shuffle */
hi_todo_consume(struct hi_thr * hit)100 struct hi_qel* hi_todo_consume(struct hi_thr* hit)
101 {
102   struct hi_io* io;
103   struct hi_qel* qe;
104   LOCK(hit->shf->todo_mut, "todo_cons");
105   D("LOCK todo_mut.thr=%lx (cond_wait)", (long)hit->shf->todo_mut.thr);
106 
107  deque_again:
108   while (!hit->shf->todo_consume && hit->shf->poll_tok.proto == HIPROTO_POLL_OFF)  /* Empty? */
109     ERRMAC_COND_WAIT(&hit->shf->todo_cond, hit->shf->todo_mut, "todo-cons"); /* Block until work */
110   D("Out of cond_wait todo_mut.thr=%lx", (long)hit->shf->todo_mut.thr);
111 
112   if (!hit->shf->todo_consume) {
113     ASSERT(hit->shf->poll_tok.proto);
114   force_poll:
115     hit->shf->poll_tok.proto = HIPROTO_POLL_OFF;
116     D("UNLK cons-poll todo_mut.thr=%lx", (long)hit->shf->todo_mut.thr);
117     UNLOCK(hit->shf->todo_mut, "todo_cons-poll");
118     return &hit->shf->poll_tok;
119   }
120 
121   qe = hi_todo_consume_queue_inlock(hit->shf);
122   if (!ONE_OF_2(qe->kind, HI_TCP_S, HI_TCP_C)) {
123     D("cons qe_%p kind(%s) intodo=%x todo_mut.thr=%lx", qe, QEL_KIND(qe->kind), qe->intodo, (long)hit->shf->todo_mut.thr);
124     UNLOCK(hit->shf->todo_mut, "todo_cons");
125     return qe;
126   }
127 
128   io = (struct hi_io*)qe;
129   LOCK(io->qel.mut, "n_thr-inc");
130   ASSERT(!hit->cur_io);
131   if (io->n_thr == HI_IO_N_THR_END_POLL) {      /* Special close end game, see hi_close() */
132     io->n_thr = HI_IO_N_THR_END_GAME;
133     hi_todo_produce_queue_inlock(hit->shf, qe); /* Put it back: try again later */
134     UNLOCK(io->qel.mut, "n_thr-poll");
135     goto force_poll;
136   }
137   if (io->n_thr == HI_IO_N_THR_END_GAME) {
138     hit->cur_io = io;
139     hit->cur_n_close = io->n_close;
140     UNLOCK(io->qel.mut, "n_thr-end");
141     hi_close(hit, io, "cons-end");
142     goto deque_again;
143   }
144   if (io->fd & 0x80000000) {
145     D("cons-ign-closed: LK&UNLK io(%x)->qel.thr=%lx n_thr=%d r/w=%d/%d ev=%d intodo=%x", io->fd, (long)io->qel.mut.thr, io->n_thr, io->reading, io->writing, io->events, io->qel.intodo);
146     /* Let it be consumed so that r/w will fail and hi_close() is called to clean up. */
147   }
148 
149   ++io->n_thr;  /* Increase two counts: once for write, and once for read, decrease for intodo ending. Net is +1. */
150   hit->cur_io = io;
151   hit->cur_n_close = io->n_close;
152   D("cons: LK&UNLK io(%x)->qel.thr=%lx n_thr=%d r/w=%d/%d ev=%x intodo=%x", io->fd, (long)io->qel.mut.thr, io->n_thr, io->reading, io->writing, io->events, io->qel.intodo);
153   UNLOCK(io->qel.mut, "n_thr-inc");
154   D("UNLK todo_mut.thr=%lx", (long)hit->shf->todo_mut.thr);
155   UNLOCK(hit->shf->todo_mut, "todo_cons-tcp");
156   return qe;
157 }
158 
159 /*(i) Schedule new work to be done, potentially waking up the consumer threads!
160  * It is important that for HI_TCP_S and HI_TCP_C ios the n_thr is nonzero
161  * while calling this. This is to block a race to hi_close(). For poll,
162  * listener, or pdu type todos there is no such consideration.
163  * locking:: Takes todo_mut and io->qel.mut */
164 
165 /* Called by:  hi_accept, hi_accept_book, hi_close, hi_in_out, hi_poll x3, hi_send0, stomp_msg_deliver, zxbus_sched_new_delivery, zxbus_sched_pending_delivery */
hi_todo_produce(struct hi_thr * hit,struct hi_qel * qe,const char * lk,int from_poll)166 void hi_todo_produce(struct hi_thr* hit, struct hi_qel* qe, const char* lk, int from_poll)
167 {
168   struct hi_io* io;
169   LOCK(hit->shf->todo_mut, "todo_prod");
170   D("%s: LOCK todo_mut.thr=%lx", lk, (long)hit->shf->todo_mut.thr);
171 
172   if (qe->intodo == HI_INTODO_INTODO) {
173     if (ONE_OF_2(qe->kind, HI_TCP_S, HI_TCP_C)) {
174       io = ((struct hi_io*)qe);
175       D("%s: prod already in todo(%x) n_thr=%d r/w=%d/%d ev=%x", lk, io->fd, io->n_thr, io->reading, io->writing, io->events);
176       if (io->fd & 0x80000000)
177 	D("%s: prod-closed fd(%x) intodo! n_thr=%d r/w=%d/%d ev=%x intodo=%x", lk, io->fd, io->n_thr, io->reading, io->writing, io->events, io->qel.intodo);
178     } else {
179       D("%s: prod already in todo qe_%p kind(%s)", lk, qe, QEL_KIND(qe->kind));
180     }
181     goto out;
182   }
183 
184   if (!ONE_OF_2(qe->kind, HI_TCP_S, HI_TCP_C)) {
185     D("%s: prod qe(%p) kind(%s)", lk, qe, QEL_KIND(qe->kind));
186     goto produce;
187   }
188 
189   io = (struct hi_io*)qe;
190   LOCK(io->qel.mut, "n_thr-inc-todo");
191   if (from_poll) {
192     /* Detect already closed (or even end game) io, see hi_close(). Note that
193      * this detection only needs to apply to produce from poll. */
194     if (io->n_thr == HI_IO_N_THR_END_POLL || io->fd & 0x80000000) {
195       D("%s: prod(%x)-ign LK&UNLK n_c/t=%d/%d r/w=%d/%d ev=%x", lk, io->fd, io->n_close, io->n_thr, io->reading, io->writing, io->events);
196       UNLOCK(io->qel.mut, "n_thr-inc-ign");
197       goto out;
198     }
199     ASSERTOPI(io->n_thr, >=, 0);
200     ++io->n_thr;  /* Should have been done already by caller, but for poll optimize lock. */
201   } else {
202     if (io->n_thr != HI_IO_N_THR_END_POLL) {
203       ASSERTOPI(io->n_thr, >=, 0);
204     }
205   }
206   //if (io->fd & 0x80000000) { /* *** fast fail hi_close() ? */ }
207   D("%s: prod(%x) LK&UNLK n_c/t=%d/%d r/w=%d/%d ev=%x", lk, io->fd, io->n_close, io->n_thr, io->reading, io->writing, io->events);
208   UNLOCK(io->qel.mut, "n_thr-inc-todo");
209 
210 produce:
211   hi_todo_produce_queue_inlock(hit->shf, qe);
212   ERRMAC_COND_SIG(&hit->shf->todo_cond, "todo-prod");  /* Wake up consumers */
213 
214  out:
215   D("%s: UNLOCK todo_mut.thr=%lx", lk, (long)hit->shf->todo_mut.thr);
216   UNLOCK(hit->shf->todo_mut, "todo_prod");
217 }
218 
219 /* EOF  --  hitodo.c */
220