1 /* hiios.c - Hiquu I/O Engine I/O shuffler 2 * Copyright (c) 2006,2012 Sampo Kellomaki (sampo@iki.fi), All Rights Reserved. 3 * This is confidential unpublished proprietary source code of the author. 4 * NO WARRANTY, not even implied warranties. Contains trade secrets. 5 * Distribution prohibited unless authorized in writing. See file COPYING. 6 * Special grant: hiios.c may be used with zxid open source project under 7 * same licensing terms as zxid itself. 8 * $Id$ 9 * 10 * 15.4.2006, created over Easter holiday --Sampo 11 * 16.8.2012, modified license grant to allow use with ZXID.org --Sampo 12 * 6.9.2012, added support for TLS and SSL --Sampo 13 * 17.9.2012, factored init, todo, and net code to their own files --Sampo 14 * 15 * See http://pl.atyp.us/content/tech/servers.html for inspiration on threading strategy. 16 * 17 * MANY ELEMENTS IN QUEUE ONE ELEMENT IN Q EMPTY QUEUE 18 * consume produce consume produce consume produce 19 * | | | ,-------' | | 20 * V V V V V V 21 * qel.n --> qel.n --> qel.n --> 0 qel.n --> 0 0 0 22 * 23 * *** see if edge triggered epoll has some special consideration for accept(2). 24 */ 25 26 #include "platform.h" 27 28 #include <pthread.h> 29 #include <memory.h> 30 #include <stdlib.h> 31 //#include <unistd.h> 32 #include <fcntl.h> 33 #include <sys/types.h> 34 #include <sys/socket.h> 35 #include <errno.h> 36 #include <string.h> 37 38 #include <zx/zxid.h> 39 #include "akbox.h" 40 #include "hiproto.h" 41 #include "hiios.h" 42 #include "errmac.h" 43 44 extern int errmac_debug; 45 #ifdef MUTEX_DEBUG 46 extern pthread_mutexattr_t MUTEXATTR_DECL; 47 #endif 48 49 /*() Close an I/O object (in multiple stages) 50 * The close may be called in response to I/O errors or for controlled 51 * disconnect. At the time of first calling close, any number of 52 * threads (see io->n_thr) may be able to access the io object and the 53 * io object may still be in todo queue or it may be returned by poll. 54 * We need to wait for all these possibilities to flush out. 55 * 56 * We start by deregistering the io from poll and half closing it 57 * so that no more reads are possible (write to send e.g. TLS disconnect 58 * is still possible). As the different threads encounter the io 59 * object unusable, they will decrement io->n_thr and call hi_close(). 60 * 61 * It is important to not fully close(2) the socket as doing so 62 * would allow an accept(2) that would almost certainly use the 63 * same fd number. This would cause the still pending threads 64 * to act on the new connection, which would be a serious error. 65 * 66 * Once the io->n_thr reaches 0, the only possible source of activity 67 * for the fd is that it is returned by poll. Thus we start an end game, 68 * indicated by io->n_thr == HI_IO_N_THR_END_GAME, 69 * where we put to todo queue a poll and then the io object. This 70 * way, if the poll were about to return the io, then it is forced 71 * to do so. After poll, the io is consumed from todo the one last 72 * time and we can safely close(2) the fd. 73 * 74 * By the time we are really ready to close the io, all associated PDUs 75 * have been freed by the respective threads (usually through write 76 * of response freeing both response and request). 77 * 78 * locking:: will take io->qel.mut */ 79 80 /* Called by: hi_in_out x2, hi_read x2, hi_todo_consume, hi_write */ 81 void hi_close(struct hi_thr* hit, struct hi_io* io, const char* lk) 82 { 83 struct hi_pdu* pdu; 84 int fd = io->fd; 85 DD("%s: closing(%x) n_c/t=%d/%d", lk, fd, io->n_close, io->n_thr); 86 LOCK(io->qel.mut, "hi_close"); 87 D("LOCK io(%x)->qel.thr=%lx n_c/t=%d/%d", fd, (long)io->qel.mut.thr, io->n_close, io->n_thr); 88 89 if (fd&0x80000000) { 90 D("%s: 2nd close(%x) n_c/t=%d/%d", lk, fd, io->n_close, io->n_thr); 91 } else { 92 INFO("%s: 1st close(%x) n_c/t=%d/%d", lk, fd, io->n_close, io->n_thr); 93 if (shutdown(fd, SHUT_RD)) 94 ERR("%s: shutdown(%x) %d %s", lk, fd, errno, STRERROR(errno)); 95 hi_del_fd(hit, fd); /* stop poll from returning this fd */ 96 io->fd |= 0x80000000; /* mark as closed */ 97 ASSERTOPI(io->n_thr, >, 0); 98 --io->n_thr; /* Will not be returned by poll any more, thus remove poll "virtual thread" */ 99 } 100 101 ASSERT(io->qel.intodo != HI_INTODO_SHF_FREE); 102 ASSERT(hit->cur_io == io); 103 if (hit->cur_n_close != io->n_close) { 104 ERR("%s: already closed(%x) cur_n_close=%d != n_close=%d",lk,fd,hit->cur_n_close,io->n_close); 105 hit->cur_io = 0; 106 D("UNLOCK io(%x)->qel.thr=%lx", fd, (long)io->qel.mut.thr); 107 UNLOCK(io->qel.mut, "hi_close-already"); 108 return; 109 } 110 111 /* N.B. n_thr manipulations should be done before calling hi_close() */ 112 if (io->n_thr > 0) { 113 D("%s: close-wait(%x) n_c/t=%d/%d intodo=%x", lk, fd, io->n_close, io->n_thr, io->qel.intodo); 114 hit->cur_io = 0; 115 D("UNLOCK io(%x)->qel.thr=%lx", fd, (long)io->qel.mut.thr); 116 UNLOCK(io->qel.mut, "hi_close-wait"); 117 return; 118 } 119 if (io->n_thr == 0) { 120 io->n_thr = HI_IO_N_THR_END_POLL; 121 D("%s: close-poll(%x) n_c/t=%d/%d intodo=%x", lk, fd, io->n_close, io->n_thr, io->qel.intodo); 122 hit->cur_io = 0; 123 D("UNLOCK io(%x)->qel.thr=%lx", fd, (long)io->qel.mut.thr); 124 UNLOCK(io->qel.mut, "hi_close-poll"); 125 hi_todo_produce(hit, &io->qel, "close-poll", 0); /* Trigger 1st poll, see hi_todo_consume() */ 126 return; 127 } 128 if (io->n_thr != HI_IO_N_THR_END_GAME) { 129 ERR("%s: close-n_thr(%x) n_c/t=%d/%d intodo=%x", lk,fd,io->n_close,io->n_thr,io->qel.intodo); 130 ASSERTOPI(io->n_thr, ==, HI_IO_N_THR_END_GAME); 131 hit->cur_io = 0; 132 D("UNLOCK io(%x)->qel.thr=%lx", fd, (long)io->qel.mut.thr); 133 UNLOCK(io->qel.mut, "hi_close-n_thr"); 134 return; 135 } 136 137 /* Now we are ready to really close */ 138 139 D("%s: close-final(%x) n_c/t=%d/%d", lk, io->fd, io->n_close, io->n_thr); 140 141 for (pdu = io->reqs; pdu; pdu = pdu->n) 142 hi_free_req(hit, pdu, "close-reqs "); 143 io->reqs = 0; 144 for (pdu = io->pending; pdu; pdu = pdu->n) 145 hi_free_req(hit, pdu, "close-pend "); 146 io->pending = 0; 147 148 if (io->cur_pdu) { 149 hi_free_req(hit, io->cur_pdu, "close-cur "); 150 io->cur_pdu = hi_pdu_alloc(hit, "cur_pdu-clo"); /* *** Could we recycle the PDU without freeing? */ 151 io->cur_pdu->fe = io; 152 } 153 #ifdef ENA_S5066 154 void sis_clean(struct hi_io* io); 155 sis_clean(io); 156 #endif 157 158 /* Clear the association with entity as late as possible so ACKs may 159 * get a chance of being processed and written. */ 160 if (io->ent) { 161 if (io->ent->io == io) { 162 io->ent->io = 0; 163 /*INFO("Dissociate ent_%p (%s) from io(%x)", io->ent, io->ent->eid, io->fd);*/ 164 INFO("Dissociate ent_%p from io(%x)", io->ent, io->fd); 165 } else { 166 WARN("io(%x)->ent and ent->io(%x) are diff", io->fd, io->ent->io?io->ent->io->fd:-1); 167 } 168 io->ent = 0; 169 } else { 170 ERR("io(%x) has no entity associated", io->fd); 171 } 172 173 #ifdef USE_OPENSSL 174 if (io->ssl) { 175 SSL_shutdown(io->ssl); 176 SSL_free(io->ssl); 177 io->ssl = 0; 178 } 179 #endif 180 ASSERTOPI(io->qel.intodo, ==, HI_INTODO_IOINUSE); /* HI_INTODO_INTODO should not be possible anymore. */ 181 io->qel.intodo = HI_INTODO_SHF_FREE; 182 io->n_thr = 0; 183 ++io->n_close; 184 hit->cur_io = 0; 185 close(io->fd & 0x7ffffff); /* Now some other thread may reuse the slot by accept()ing same fd */ 186 INFO("%s: CLOSED(%x) n_close=%d", lk, io->fd, io->n_close); 187 188 /* Must let go of the lock only after close so no read can creep in. */ 189 D("UNLOCK io(%x)->qel.thr=%lx", fd, (long)io->qel.mut.thr); 190 UNLOCK(io->qel.mut, "hi_close"); 191 } 192 193 /* ---------- shuffler ---------- */ 194 195 /*() For a fd that was consumed from todo, deal with potential reads and writes */ 196 197 /* Called by: hi_shuffle */ 198 void hi_in_out(struct hi_thr* hit, struct hi_io* io) 199 { 200 int reading; 201 202 LOCK(io->qel.mut, "in_out"); 203 D("LOCK io(%x)->qel.thr=%lx r/w=%d/%d ev=%x", io->fd, (long)io->qel.mut.thr, io->reading, io->writing, io->events); 204 if (io->events & (EPOLLHUP | EPOLLERR)) { 205 D("HUP or ERR on fd=%x events=0x%x", io->fd, io->events); 206 close: 207 io->n_thr -= 2; /* Remove both counts (write and read) */ 208 ASSERT(io->n_thr >= 0); 209 UNLOCK(io->qel.mut, "in_out-hup"); 210 hi_close(hit, io, "hi_in_out-hup"); 211 return; 212 } 213 214 /* We must ensure that only one thread is trying to write. The poll may 215 * still report the io as writable after a thread has taken the 216 * task, in that case we want the second thread to skip write and 217 * go process the read. */ 218 if (io->events & EPOLLOUT && !io->writing) { 219 D("OUT fd=%x n_iov=%d n_to_write=%d writing", io->fd, io->n_iov, io->n_to_write); 220 221 /* Although in_write is checked in hi_write() as well, take the opportunity 222 * to check it right here while we already hold the lock. */ 223 if (!io->in_write) /* Need to prepare new iov? */ 224 hi_make_iov_nolock(io); 225 if (io->in_write) { 226 io->writing = 1; 227 D("UNLOCK io(%x)->qel.thr=%lx", io->fd, (long)io->qel.mut.thr); 228 UNLOCK(io->qel.mut, "check-writing-enter"); 229 230 if (hi_write(hit, io)) { /* will clear io->writing */ 231 LOCK(io->qel.mut, "n_thr-dec2"); 232 D("IN_OUT: LOCK & UNLOCK io(%x)->qel.thr=%lx closed", io->fd, (long)io->qel.mut.thr); 233 --io->n_thr; /* Remove read count, write count already removed by hi_write() */ 234 ASSERT(io->n_thr >= 0); 235 ASSERT(hit->cur_io == io); 236 ASSERT(hit->cur_n_close == io->n_close); 237 UNLOCK(io->qel.mut, "n_thr-dec2"); 238 hi_close(hit, io, "write-shortcircuit-close"); /* Close again, now n_thr was reduced */ 239 return; /* Write caused close, read will be futile */ 240 } else { 241 LOCK(io->qel.mut, "check-reading"); 242 D("LOCK io(%x)->qel.thr=%lx", io->fd, (long)io->qel.mut.thr); 243 } 244 } else { 245 if (io->fd & 0x80000000) { 246 /* Seems it was already a closed one, but due to no write, no opportunity for error. */ 247 D("nothing to write and closed io(%x)->n_thr=%d", io->fd, io->n_thr); 248 goto close; 249 } 250 --io->n_thr; /* Remove write count as no write happened. */ 251 D("no inwrite io(%x)->n_thr=%d", io->fd, io->n_thr); 252 } 253 } else { 254 --io->n_thr; /* Remove write count as no write happened. */ 255 D("no EPOLLOUT io(%x)->n_thr=%d", io->fd, io->n_thr); 256 } 257 ASSERT(io->n_thr > 0 || io->n_thr == HI_IO_N_THR_END_GAME); /* Read cnt should still be there */ 258 io->events &= ~EPOLLOUT; /* Clear poll flag in case we get read rescheduling */ 259 260 if (io->events & EPOLLIN) { 261 /* A special problem with EAGAIN: read(2) is not guaranteed to arm edge triggered epoll(2) 262 * unless at least one EAGAIN read has happened. The problem is that as we are still 263 * in io->reading, if after this EAGAIN another thread polls and consumes from todo, it 264 * will not be able to read due to io->reading even though poll told it to read. After 265 * missing the opportunity, the next poll will not report fd anymore because no read has 266 * happened since previous report. Ouch! 267 * Solution attempt: if read was polled, but could not be served due to io->reading. 268 * the PDU is added back to the todo queue. This may cause the other thread to spin 269 * for a while, but at least things will move on eventually. */ 270 if (!io->reading) { 271 D("IN fd=%x cur_pdu=%p need=%d", io->fd, io->cur_pdu, io->cur_pdu->need); 272 /* Poll says work is possible: sched wk for io if not under wk yet, or cur_pdu needs wk. 273 * The inverse is also important: if io->cur_pdu is set, but pdu->need is not, then someone 274 * is alredy working on decoding the cur_pdu and we should not interfere. */ 275 reading = io->reading = io->cur_pdu->need; /* only place where io->reading is set */ 276 D("UNLOCK io(%x)->qel.thr=%lx", io->fd, (long)io->qel.mut.thr); 277 UNLOCK(io->qel.mut, "check-reading"); 278 if (reading) { 279 hi_read(hit, io); /* io->n_thr and hit->cur_io have already been updated */ 280 ASSERT(!hit->cur_io); 281 } else { 282 LOCK(io->qel.mut, "n_thr-dec3"); 283 D("IN_OUT: LOCK & UNLOCK io(%x)->qel.thr=%lx", io->fd, (long)io->qel.mut.thr); 284 --io->n_thr; /* Remove read count as no read happened. */ 285 ASSERT(io->n_thr >= 0); 286 ASSERT(hit->cur_io == io); 287 ASSERT(hit->cur_n_close == io->n_close); 288 hit->cur_io = 0; 289 UNLOCK(io->qel.mut, "n_thr-dec3"); 290 } 291 } else { 292 ASSERT(io->n_thr > 0); 293 ASSERT(hit->cur_io == io); 294 ASSERT(hit->cur_n_close == io->n_close); 295 /*--io->n_thr; * Do not decrement. We need to keep n_thr until we are in todo queue. */ 296 hit->cur_io = 0; 297 D("UNLOCK io(%x)->qel.thr=%lx", io->fd, (long)io->qel.mut.thr); 298 UNLOCK(io->qel.mut, "n_thr-dec4"); 299 D("resched(%x) to avoid miss poll read n_thr=%d", io->fd, io->n_thr); 300 hi_todo_produce(hit, &io->qel, "reread", 0); /* try again so read poll is not lost */ 301 } 302 } else { 303 --io->n_thr; /* Remove read count as no read happened. */ 304 ASSERT(io->n_thr >= 0); 305 ASSERT(hit->cur_io == io); 306 ASSERT(hit->cur_n_close == io->n_close); 307 hit->cur_io = 0; 308 D("UNLOCK io(%x)->qel.thr=%lx", io->fd, (long)io->qel.mut.thr); 309 UNLOCK(io->qel.mut, "n_thr-dec5"); 310 } 311 } 312 313 /*() Main I/O shuffling loop. Never returns. Main loop of most (all?) threads. */ 314 315 /* Called by: thread_loop, zxbusd_main */ 316 void hi_shuffle(struct hi_thr* hit, struct hiios* shf) 317 { 318 struct hi_qel* qe; 319 hit->shf = shf; 320 LOCK(shf->todo_mut, "add-thread"); 321 hit->n = shf->threads; 322 shf->threads = hit; 323 UNLOCK(shf->todo_mut, "add-thread"); 324 INFO("Start shuffling hit(%p) shf(%p)", hit, shf); 325 hi_sanity_shf(255, shf); 326 while (1) { 327 HI_SANITY(hit->shf, hit); 328 qe = hi_todo_consume(hit); /* Wakes up the heard to receive work. */ 329 switch (qe->kind) { 330 case HI_POLLT: hi_poll(hit); break; 331 case HI_LISTENT: hi_accept(hit, (struct hi_io*)qe); break; 332 case HI_HALF_ACCEPT: hi_accept_book(hit, (struct hi_io*)qe, ((struct hi_io*)qe)->fd); 333 case HI_TCP_C: 334 case HI_TCP_S: hi_in_out(hit, (struct hi_io*)qe); break; 335 case HI_PDU_DIST: stomp_msg_deliver(hit, (struct hi_pdu*)qe); break; 336 #ifdef HAVE_NET_SNMP 337 case HI_SNMP: if (snmp_port) processSNMP(); break; /* *** needs more thought */ 338 #endif 339 default: NEVER("unknown qel->kind 0x%x", qe->kind); 340 } 341 } 342 } 343 344 /* EOF -- hiios.c */ 345