1 /* hitodo.c - Hiquu I/O Engine todo queue management
2 * Copyright (c) 2006,2012 Sampo Kellomaki (sampo@iki.fi), All Rights Reserved.
3 * This is confidential unpublished proprietary source code of the author.
4 * NO WARRANTY, not even implied warranties. Contains trade secrets.
5 * Distribution prohibited unless authorized in writing. See file COPYING.
6 * Special grant: hiios.c may be used with zxid open source project under
7 * same licensing terms as zxid itself.
8 * $Id$
9 *
10 * 15.4.2006, created over Easter holiday --Sampo
11 * 16.8.2012, modified license grant to allow use with ZXID.org --Sampo
12 * 6.9.2012, added support for TLS and SSL --Sampo
13 * 17.9.2012, factored todo code to its own file --Sampo
14 *
15 * See http://pl.atyp.us/content/tech/servers.html for inspiration on threading strategy.
16 *
17 * MANY ELEMENTS IN QUEUE ONE ELEMENT IN Q EMPTY QUEUE
18 * consume produce consume produce consume produce
19 * | | | ,-------' | |
20 * V V V V V V
21 * qel.n --> qel.n --> qel.n --> 0 qel.n --> 0 0 0
22 *
23 ****
24 * accept() blocks (after accept returned EAGAIN) - see if this is a blocking socket
25 * see if edge triggered epoll has some special consideration for accept(2).
26 */
27
28 #include "platform.h"
29
30 #include <pthread.h>
31 #include <memory.h>
32 #include <stdlib.h>
33 //#include <unistd.h>
34 #include <fcntl.h>
35 #include <sys/types.h>
36 #include <sys/socket.h>
37 #include <errno.h>
38 #include <string.h>
39
40 #include <zx/zxid.h>
41 #include "akbox.h"
42 #include "hiproto.h"
43 #include "hiios.h"
44 #include "errmac.h"
45
46 extern int errmac_debug;
47
48 const char* qel_kind[] = {
49 "OFF0",
50 "poll1",
51 "listen2",
52 "half_accept3",
53 "tcp_s4",
54 "tcp_c5",
55 "snmp6",
56 "pdu7",
57 0
58 };
59
60 #define QEL_KIND(x) (((x) >= 0 && (x) < sizeof(qel_kind)/sizeof(char*))?qel_kind[(x)]:"???")
61
62 /* -------- todo_queue management, waking up threads to consume work (io, pdu) -------- */
63
64 /*(-) Simple mechanics of deque operation against shf->todo_consumer */
65
66 /* Called by: hi_todo_consume */
hi_todo_consume_queue_inlock(struct hiios * shf)67 static struct hi_qel* hi_todo_consume_queue_inlock(struct hiios* shf)
68 {
69 struct hi_qel* qe = shf->todo_consume;
70 shf->todo_consume = qe->n;
71 if (!qe->n)
72 shf->todo_produce = 0;
73 qe->n = 0;
74 qe->intodo = qe->kind == HI_PDU_DIST ? HI_INTODO_PDUINUSE : HI_INTODO_IOINUSE;
75 --shf->n_todo;
76 return qe;
77 }
78
79 /*(-) Simple mechanics of enque operation against shf->todo_producer */
80
81 /* Called by: hi_todo_consume, hi_todo_produce */
hi_todo_produce_queue_inlock(struct hiios * shf,struct hi_qel * qe)82 static void hi_todo_produce_queue_inlock(struct hiios* shf, struct hi_qel* qe)
83 {
84 if (shf->todo_produce)
85 shf->todo_produce->n = qe;
86 else
87 shf->todo_consume = qe;
88 shf->todo_produce = qe;
89 qe->n = 0;
90 qe->intodo = HI_INTODO_INTODO;
91 ++shf->n_todo;
92 }
93
94 /*(i) Consume from todo queue. If nothing is available,
95 * block until there is work to do. If todo queue is
96 * empty, see if we should poll again. This is the main
97 * mechanism by which worker threads get something to do. */
98
99 /* Called by: hi_shuffle */
hi_todo_consume(struct hi_thr * hit)100 struct hi_qel* hi_todo_consume(struct hi_thr* hit)
101 {
102 struct hi_io* io;
103 struct hi_qel* qe;
104 LOCK(hit->shf->todo_mut, "todo_cons");
105 D("LOCK todo_mut.thr=%lx (cond_wait)", (long)hit->shf->todo_mut.thr);
106
107 deque_again:
108 while (!hit->shf->todo_consume && hit->shf->poll_tok.proto == HIPROTO_POLL_OFF) /* Empty? */
109 ERRMAC_COND_WAIT(&hit->shf->todo_cond, hit->shf->todo_mut, "todo-cons"); /* Block until work */
110 D("Out of cond_wait todo_mut.thr=%lx", (long)hit->shf->todo_mut.thr);
111
112 if (!hit->shf->todo_consume) {
113 ASSERT(hit->shf->poll_tok.proto);
114 force_poll:
115 hit->shf->poll_tok.proto = HIPROTO_POLL_OFF;
116 D("UNLK cons-poll todo_mut.thr=%lx", (long)hit->shf->todo_mut.thr);
117 UNLOCK(hit->shf->todo_mut, "todo_cons-poll");
118 return &hit->shf->poll_tok;
119 }
120
121 qe = hi_todo_consume_queue_inlock(hit->shf);
122 if (!ONE_OF_2(qe->kind, HI_TCP_S, HI_TCP_C)) {
123 D("cons qe_%p kind(%s) intodo=%x todo_mut.thr=%lx", qe, QEL_KIND(qe->kind), qe->intodo, (long)hit->shf->todo_mut.thr);
124 UNLOCK(hit->shf->todo_mut, "todo_cons");
125 return qe;
126 }
127
128 io = (struct hi_io*)qe;
129 LOCK(io->qel.mut, "n_thr-inc");
130 ASSERT(!hit->cur_io);
131 if (io->n_thr == HI_IO_N_THR_END_POLL) { /* Special close end game, see hi_close() */
132 io->n_thr = HI_IO_N_THR_END_GAME;
133 hi_todo_produce_queue_inlock(hit->shf, qe); /* Put it back: try again later */
134 UNLOCK(io->qel.mut, "n_thr-poll");
135 goto force_poll;
136 }
137 if (io->n_thr == HI_IO_N_THR_END_GAME) {
138 hit->cur_io = io;
139 hit->cur_n_close = io->n_close;
140 UNLOCK(io->qel.mut, "n_thr-end");
141 hi_close(hit, io, "cons-end");
142 goto deque_again;
143 }
144 if (io->fd & 0x80000000) {
145 D("cons-ign-closed: LK&UNLK io(%x)->qel.thr=%lx n_thr=%d r/w=%d/%d ev=%d intodo=%x", io->fd, (long)io->qel.mut.thr, io->n_thr, io->reading, io->writing, io->events, io->qel.intodo);
146 /* Let it be consumed so that r/w will fail and hi_close() is called to clean up. */
147 }
148
149 ++io->n_thr; /* Increase two counts: once for write, and once for read, decrease for intodo ending. Net is +1. */
150 hit->cur_io = io;
151 hit->cur_n_close = io->n_close;
152 D("cons: LK&UNLK io(%x)->qel.thr=%lx n_thr=%d r/w=%d/%d ev=%x intodo=%x", io->fd, (long)io->qel.mut.thr, io->n_thr, io->reading, io->writing, io->events, io->qel.intodo);
153 UNLOCK(io->qel.mut, "n_thr-inc");
154 D("UNLK todo_mut.thr=%lx", (long)hit->shf->todo_mut.thr);
155 UNLOCK(hit->shf->todo_mut, "todo_cons-tcp");
156 return qe;
157 }
158
159 /*(i) Schedule new work to be done, potentially waking up the consumer threads!
160 * It is important that for HI_TCP_S and HI_TCP_C ios the n_thr is nonzero
161 * while calling this. This is to block a race to hi_close(). For poll,
162 * listener, or pdu type todos there is no such consideration.
163 * locking:: Takes todo_mut and io->qel.mut */
164
165 /* Called by: hi_accept, hi_accept_book, hi_close, hi_in_out, hi_poll x3, hi_send0, stomp_msg_deliver, zxbus_sched_new_delivery, zxbus_sched_pending_delivery */
hi_todo_produce(struct hi_thr * hit,struct hi_qel * qe,const char * lk,int from_poll)166 void hi_todo_produce(struct hi_thr* hit, struct hi_qel* qe, const char* lk, int from_poll)
167 {
168 struct hi_io* io;
169 LOCK(hit->shf->todo_mut, "todo_prod");
170 D("%s: LOCK todo_mut.thr=%lx", lk, (long)hit->shf->todo_mut.thr);
171
172 if (qe->intodo == HI_INTODO_INTODO) {
173 if (ONE_OF_2(qe->kind, HI_TCP_S, HI_TCP_C)) {
174 io = ((struct hi_io*)qe);
175 D("%s: prod already in todo(%x) n_thr=%d r/w=%d/%d ev=%x", lk, io->fd, io->n_thr, io->reading, io->writing, io->events);
176 if (io->fd & 0x80000000)
177 D("%s: prod-closed fd(%x) intodo! n_thr=%d r/w=%d/%d ev=%x intodo=%x", lk, io->fd, io->n_thr, io->reading, io->writing, io->events, io->qel.intodo);
178 } else {
179 D("%s: prod already in todo qe_%p kind(%s)", lk, qe, QEL_KIND(qe->kind));
180 }
181 goto out;
182 }
183
184 if (!ONE_OF_2(qe->kind, HI_TCP_S, HI_TCP_C)) {
185 D("%s: prod qe(%p) kind(%s)", lk, qe, QEL_KIND(qe->kind));
186 goto produce;
187 }
188
189 io = (struct hi_io*)qe;
190 LOCK(io->qel.mut, "n_thr-inc-todo");
191 if (from_poll) {
192 /* Detect already closed (or even end game) io, see hi_close(). Note that
193 * this detection only needs to apply to produce from poll. */
194 if (io->n_thr == HI_IO_N_THR_END_POLL || io->fd & 0x80000000) {
195 D("%s: prod(%x)-ign LK&UNLK n_c/t=%d/%d r/w=%d/%d ev=%x", lk, io->fd, io->n_close, io->n_thr, io->reading, io->writing, io->events);
196 UNLOCK(io->qel.mut, "n_thr-inc-ign");
197 goto out;
198 }
199 ASSERTOPI(io->n_thr, >=, 0);
200 ++io->n_thr; /* Should have been done already by caller, but for poll optimize lock. */
201 } else {
202 if (io->n_thr != HI_IO_N_THR_END_POLL) {
203 ASSERTOPI(io->n_thr, >=, 0);
204 }
205 }
206 //if (io->fd & 0x80000000) { /* *** fast fail hi_close() ? */ }
207 D("%s: prod(%x) LK&UNLK n_c/t=%d/%d r/w=%d/%d ev=%x", lk, io->fd, io->n_close, io->n_thr, io->reading, io->writing, io->events);
208 UNLOCK(io->qel.mut, "n_thr-inc-todo");
209
210 produce:
211 hi_todo_produce_queue_inlock(hit->shf, qe);
212 ERRMAC_COND_SIG(&hit->shf->todo_cond, "todo-prod"); /* Wake up consumers */
213
214 out:
215 D("%s: UNLOCK todo_mut.thr=%lx", lk, (long)hit->shf->todo_mut.thr);
216 UNLOCK(hit->shf->todo_mut, "todo_prod");
217 }
218
219 /* EOF -- hitodo.c */
220