1 /* hiios.h - Hiquu I/O Engine 2 * Copyright (c) 2006,2012 Sampo Kellomaki (sampo@iki.fi), All Rights Reserved. 3 * This is confidential unpublished proprietary source code of the author. 4 * NO WARRANTY, not even implied warranties. Contains trade secrets. 5 * Distribution prohibited unless authorized in writing. See file COPYING. 6 * Special grant: hiios.h may be used with zxid open source project under 7 * same licensing terms as zxid itself. 8 * $Id$ 9 * 10 * 15.4.2006, created over Easter holiday --Sampo 11 * 23.4.2006, DTS specific enhancements --Sampo 12 * 16.8.2012, modified license grant to allow use with ZXID.org --Sampo 13 * 14 * A shuffler (hiios) is the top most global object, containing all 15 * the connection objects and original global PDU memory pool. 16 * Each thread has a shuffler, but also a local pool of PDU memory 17 * that can be accessed without locking. 18 * 19 * PDU is always somehow reachable through pdu->qel.n (next) pointer, but 20 * membership in queue is mutually exclusive as follows 21 * 1. shf->free_pdus -- global memalloc pool (shf->pdus is backing store) 22 * 2. hit->free_pdus -- per thread free list, for allocation within a thread 23 * 3. shf->todo_consume -- The todo list, marking that some polling needs to be done 24 * 25 * PDU also has its own next pointer, the pdu->n which is used to keep lists of active PDUs 26 * A. io->reqs -- linked list of real reqs of this session, protect by qel.mut 27 * B. io->cur_pdu -- not really a list, but PDU can be here after allocation and before reqs 28 * C. pdu->reals -- linked list of real resps to this req 29 * D. pdu->synths -- linked list of subreqs and synth resps (not yet used as of 2012) 30 * 31 * Additionally a PDU may participate in various write related queues using wn (write next) pointer 32 * i. io->to_write_produce -- add new pdus here (main thr only) 33 * ii. io->to_write_consume -- list of PDUs that are imminently going to be written 34 * iii. io->in_write -- list of pdus that are in process of being written (have iovs) 35 * iv. io->subresps -- subreq: list of resps, to ds_wait() upon 36 * 37 * The PDUs in the to_write queue have pdu->wn pointing from consume towards produce: 38 * 39 * to_write_produce --> pdu3 pdu2 pdu1 <-- to_write_consume 40 * wn:0 <----wn <----wn 41 * 42 * Here pdu1 was inserted first, then pdu2, etc. Inserts happen at produce 43 * end (and set the wn pointer of the previous head to point to new head) 44 * and removals at consumer end (by chasing the wn pointer). Here's queue with one element 45 * 46 * to_write_produce --> pdu1 <---------------------- to_write_consume 47 * wn:0 48 * 49 * Empty queue is expressed by both pointer being null. 50 * 51 * to_write_produce:0 to_write_consume:0 52 * 53 * Threading strategy: 54 * 55 * An I/O object needs to "belong" to single thread for duration of a 56 * polled I/O activity: if a thread responsd to poll for read, it needs 57 * to maintain control of the io object until it has read and decoded 58 * the PDU to a point where it moves from io->cur_pdu to io->reqs. After 59 * this another thread may be enabled to read another PDU from the 60 * socket. 61 * 62 * Alternatively, if the decode state of a PDU is stored in a thread 63 * safe way, the current thread may relinguish control when it sets pdu->need 64 * to indicate that further data needs to be read. At this point, some other 65 * thread may actually perform the additional read and finish the decoding. 66 * 67 * In general, after decoding PDU, the thread should not hang on to 68 * the I/O object even if it continues to perform the payload function 69 * of the PDU, as a worker thread. Thus same thread is expected to 70 * transform from an I/O thread to worker thread on the fly. If the 71 * payload processing produces response or subrequest, in principle 72 * the additional processing can go through the poll, but there does 73 * not seem to be much harm in "short circuiting" this process by 74 * having same worker thread assume the I/O thread role for sending 75 * the response or subrequest, provided that the destination I/O 76 * object is not occupied by another thread. If it is, the short 77 * circuiting can not be done and the response or subrequest must go 78 * through write poll. 79 * 80 * The basic mechanism to avoid other thread squatting on I/O object is to 81 * ensure that it is not scheduled to poll or todo list while exclusive 82 * access is desired. 83 * 84 * Lock ordering 85 * 1. shf->todo_mut 86 * 2. shf->todo_cond 87 * 3. io->qel.mut 88 * 1. io->qel.mut e.g. hi_close() calling hi_pdu_alloc() or hi_pdu_free() 89 * 2. shf->pdu_mut 90 * 1. shf->ent_mut 91 * 2. io->qel.mut 92 * 93 * io->reading and io->writing flags are used to ensure that only single 94 * thread will be doing the I/O at a time (one thread for read, other for 95 * write is allowed). In addition to this, we need to ensure that 96 * fd is not completely closed while any thread may still be 97 * using it. This is accomplished using the n_thr counter. 98 * 99 * n_thr is incremented in hi_todo_consume() or upon short circuit 100 * write. It is decremented either on end of write, end of read, 101 * or in hi_in_out() if nothing was done. 102 * 103 * See http://pl.atyp.us/content/tech/servers.html for inspiration on threading strategy. 104 * http://www.kegel.com/c10k.html 105 */ 106 107 #ifndef _hiios_h 108 #define _hiios_h 109 110 #ifdef LINUX 111 #include <sys/epoll.h> /* See man 4 epoll (Linux 2.6) */ 112 #endif 113 #ifdef SUNOS 114 #include <sys/devpoll.h> /* See man -s 7d poll (Solaris 8) */ 115 #include <sys/poll.h> 116 #endif 117 #ifdef USE_OPENSSL 118 #include <openssl/ssl.h> 119 #endif 120 121 #include <netinet/in.h> 122 #include <sys/uio.h> 123 #include <pthread.h> 124 125 #include "hiproto.h" 126 127 struct hi_lock { 128 pthread_mutex_t ptmut; 129 const char* func; /* Remember where we locked to ease debugging. */ 130 int line; 131 pthread_t thr; 132 }; 133 134 #ifndef IOV_MAX 135 #define IOV_MAX 16 136 #endif 137 #define HI_N_IOV (IOV_MAX < 32 ? IOV_MAX : 32) /* Avoid unreasonably huge iov */ 138 #if 0 139 #define HI_PDU_MEM 2200 /* Default PDU memory buffer size, sufficient for reliable data */ 140 #define HI_PDU_MEM 4200 /* Default PDU memory buffer size, sufficient for broadcast data */ 141 #endif 142 #define HI_PDU_MEM 3072 /* Default PDU memory buffer size, for log lines */ 143 144 /* qel.kind constants */ 145 #define HI_POLLT 1 /* Trigger epoll */ 146 #define HI_LISTENT 2 /* Listening socket for TCP */ 147 #define HI_HALF_ACCEPT 3 /* Accepted at TCP, but delayed booking due to threads expecting old dead connection. */ 148 #define HI_TCP_S 4 /* TCP server socket, i.e. accept(2)'d from listening socket */ 149 #define HI_TCP_C 5 /* TCP client socket, i.e. formed using connect(2) */ 150 #define HI_SNMP 6 /* SNMP (UDP) socket */ 151 #define HI_PDU_DIST 7 /* PDU with intent to deliver STOMP message */ 152 153 /* qel.intodo constants */ 154 #define HI_INTODO_SHF_FREE 0 /* in shuffler free queue (PDU or IO) */ 155 #define HI_INTODO_HIT_FREE 1 /* in thread free queue */ 156 #define HI_INTODO_INTODO 2 /* intodo queue */ 157 #define HI_INTODO_IOINUSE 3 /* IO in use */ 158 #define HI_INTODO_PDUINUSE 4 /* PDU in use */ 159 160 #define HI_IO_N_THR_END_GAME (-3) /* Special io->n_thr value to indicate close end game. */ 161 #define HI_IO_N_THR_END_POLL (-7) /* Special io->n_thr value to indicate close end game. */ 162 163 struct hi_qel { /* hiios task queue element. This is the 1st thing on io and pdu objects */ 164 struct hi_qel* n; /* Next in todo_queue for IOs or in free_pdus. */ 165 struct hi_lock mut; 166 char kind; 167 char proto; /* See HIPROTO_* constants */ 168 char intodo; /* Flag indicating object (io or pdu) is in shf->todo_consume queue */ 169 char pad3; 170 }; 171 172 /*(s) Connection object */ 173 174 struct hi_io { 175 struct hi_qel qel; /* Next in todo_queue for IOs or in free_pdus. */ 176 struct hi_io* n; /* next among io objects, esp. backends */ 177 struct hi_io* pair; /* the other half of a proxy connection */ 178 struct hi_ent* ent; /* Login entity associated with connection */ 179 int fd; /* file descriptor (socket), or 0x80000000 flag if not in use */ 180 char events; /* events from last poll */ 181 char n_iov; 182 char writing; /* Flag, protected by io->qel.mut, that indicates that some thread is processing a write on the io object. */ 183 char reading; /* Flag, protected by io->qel.mut, that indicates that some thread is processing a read on the io object. */ 184 struct iovec* iov_cur; /* not used by listeners, only used for writev by sessions and backend ses */ 185 struct iovec iov[HI_N_IOV]; 186 int n_thr; /* num threads using this io, lock io->qel.mut */ 187 int n_to_write; /* length of to_write queue */ 188 struct hi_pdu* in_write; /* wn list of pdus that are in process of being written (have iovs) */ 189 struct hi_pdu* to_write_consume; /* wn list of PDUs that are imminently going to be written */ 190 struct hi_pdu* to_write_produce; /* wn add new pdus here (main thr only) */ 191 192 /* Statistics counters */ 193 int n_close; /* Number of closes. Generation counter. */ 194 int n_written; /* bytes */ 195 int n_read; /* bytes */ 196 int n_pdu_out; 197 int n_pdu_in; 198 199 struct hi_pdu* cur_pdu; /* PDU for which we currently expect to do read I/O */ 200 struct hi_pdu* reqs; /* n linked list of real reqs of this session, protect by qel.mut */ 201 struct hi_pdu* pending; /* n linked list of requests sent to client and pending response, protect by qel.mut */ 202 union { 203 struct dts_conn* dts; 204 int sap; /* S5066 SAP ID, indexes into saptab[] and svc_type_tab[] */ 205 struct { 206 struct hi_pdu* uni_ind_hmtp; 207 int state; 208 } smtp; 209 struct { 210 int msgid; 211 } stomp; 212 } ad; /* Application specific data */ 213 #ifdef USE_OPENSSL 214 SSL* ssl; 215 #endif 216 }; 217 218 struct hi_ad_stomp { 219 int len; /* Populated from content-length header, if one is supplied. */ 220 char* body; /* Body of the message */ 221 char* dest; /* destination, also heart_bt, zx_rcpt_sig */ 222 char* host; /* also receipt and receipt_id */ 223 char* vers; /* version, also accept-version, tx_id */ 224 char* login; /* also session, subs_id, subsc */ 225 char* pw; /* also server, ack, msg_id */ 226 }; 227 228 /*(s) PDU object */ 229 230 struct hi_pdu { 231 struct hi_qel qel; 232 struct hi_pdu* n; /* Next among requests or responses */ 233 struct hi_pdu* wn; /* Write next. Used by in_write, to_write, and subresps queues. */ 234 struct hi_io* fe; /* Frontend of the PDU, e.g. where req was read from. */ 235 236 struct hi_pdu* req; /* Set for response to indicate which request it is response to. */ 237 struct hi_pdu* parent; /* Set for sub-requests and -responses */ 238 239 struct hi_pdu* subresps; /* wn subreq: list of resps, to ds_wait() upon */ 240 struct hi_pdu* reals; /* pdu->n linked list of real resps to this req */ 241 struct hi_pdu* synths; /* pdu->n linked list of subreqs and synth resps */ 242 243 short color; /* Coloring flag for integrity tests, e.g. to detect circular ptrs */ 244 char events; /* events needed by this PDU (EPOLLIN or EPOLLOUT) */ 245 char n_iov; 246 struct iovec iov[3]; /* Enough for header, payload, and CRC */ 247 248 int need; /* How much more is needed to complete a PDU? Also final length. */ 249 char* scan; /* How far has protocol parsin progressed, e.g. in SMTP. */ 250 char* ap; /* Allocation pointer: next free memory location */ 251 char* m; /* Beginning of memory (often m == mem, but could be malloc'd) */ 252 char* lim; /* One past end of memory */ 253 char mem[HI_PDU_MEM]; /* Memory for processing a PDU */ 254 255 union { 256 #ifdef ENA_S5066 257 struct { 258 int n_tx_seq; /* Transmit Frame Sequence Number */ 259 int addr_len; 260 char* c_pdu; /* S5066 DTS segmented C_PDU */ 261 } dts; 262 struct { 263 char rx_map[SIS_MAX_PDU_SIZE/8]; /* bitmap of bytes rx'd so we know if we have rx'd all */ 264 } dtsrx; 265 #endif 266 struct { 267 char* skip_ehlo; 268 } smtp; 269 struct hi_ad_stomp stomp; 270 struct { 271 int len; /* Body length. */ 272 char* body; /* Body of the message */ 273 char* dest; /* destination, also heart_bt */ 274 int ack_fd; /* File where acks are collected. */ 275 int acks; /* Ack counter for delivery. */ 276 int nacks; /* Nack counter: incontactables for delivery. */ 277 } delivb; 278 } ad; /* Application specific data */ 279 }; 280 281 #if 0 282 struct c_pdu_buf; 283 #endif 284 285 /*(s) Main shuffler object. 286 * Principal function is to hold epoll_event array and todo list. 287 * Secondary function is to be memory pool of last resort. */ 288 289 struct hiios { 290 int ep; /* epoll(4) (Linux 2.6) or /dev/poll (Solaris 8, man -s 7d poll) file descriptor */ 291 int n_evs; /* how many useful events last epoll_wait() returned */ 292 int max_evs; 293 #ifdef LINUX 294 struct epoll_event* evs; 295 #endif 296 #ifdef SUNOS 297 struct pollfd* evs; 298 #endif 299 #if defined(MACOSX) || defined(FREEBSD) 300 struct kevent* evs; 301 #endif 302 //int n_ios; 303 int max_ios; /* Size of ios array = maximum number of fds */ 304 struct hi_io* ios; /* Dynamically allocated array of io objects, one per fd. */ 305 306 struct hi_lock pdu_mut; 307 int max_pdus; 308 struct hi_pdu* pdu_buf_blob; /* Backingstore for the PDU pool (big blob) */ 309 struct hi_pdu* free_pdus; /* Global pool of PDUs (linked list) */ 310 311 #if 0 312 struct hi_lock c_pdu_buf_mut; 313 int max_c_pdu_bufs; 314 struct c_pdu_buf* c_pdu_bufs; /* global pool for c_pdu buffers */ 315 struct c_pdu_buf* free_c_pdu_bufs; 316 #endif 317 318 struct hi_lock todo_mut; 319 pthread_cond_t todo_cond; 320 struct hi_qel* todo_consume; /* PDUs and I/O objects that need processing */ 321 struct hi_qel* todo_produce; 322 int n_todo; 323 struct hi_qel poll_tok; /* Special qel to be inserted in todo_consume to trigger poll. */ 324 325 int nthr; /* Number of threads referencing this shf */ 326 struct hi_thr* threads; /* List of threads. */ 327 struct hi_lock ent_mut; 328 struct hi_ent* ents; /* List of subscribing entities */ 329 int max_chs; /* Maximum number of channels */ 330 struct hi_ch* chs; /* Array of channels */ 331 332 char anonlogin; /* Config: whether anonymous login is ok. */ 333 char res1; 334 char res2; 335 char res3; 336 #ifdef USE_OPENSSL 337 SSL_CTX* ssl_ctx; 338 #endif 339 }; 340 341 /*(s) Thread object */ 342 343 struct hi_thr { 344 struct hi_thr* n; 345 struct hiios* shf; 346 struct hi_io* cur_io; /* Only valid for HI_TCP_S and HI_TCP_C */ 347 int cur_n_close; /* Generation value of the current io */ 348 int n_free_pdus; 349 struct hi_pdu* free_pdus; /* Per thread pool of PDUs */ 350 #if 0 351 struct c_pdu_buf* free_c_pdu_bufs; 352 #endif 353 pthread_t self; 354 }; 355 356 struct hi_host_spec { 357 struct hi_host_spec* next; 358 struct sockaddr_in sin; 359 int proto; 360 char* specstr; 361 struct hi_io* conns; 362 }; 363 364 struct hi_proto { 365 char name[8]; 366 int default_port; 367 int is_tls; 368 struct hi_host_spec* specs; 369 }; 370 371 extern struct hi_proto hi_prototab[]; 372 373 /*(s) Channel or destination designation object */ 374 375 struct hi_ch { 376 char* dest; 377 }; 378 379 /*(s) Node for linked list of PDUs acknowledged by the entity */ 380 381 struct hi_ack { 382 struct hi_ack* n; 383 struct hi_pdu* pdu; 384 }; 385 386 /*(s) Entity or subscriber object. Typically loaded from /var/bus/.ents */ 387 388 struct hi_ent { 389 struct hi_ent* n; 390 char* eid; /* EntityID as seen in STOMP 1.1 login header */ 391 struct hi_io* io; 392 char* chs; /* Subscribed channels as an array of char */ 393 struct hi_ack* acks; 394 }; 395 396 #define HI_NOSUBS 0 397 #define HI_SUBS 1 /* Subscribed, but not logged in. */ 398 #define HI_SUBS_ON 2 /* Subscribed and logged in with subscribe message */ 399 #define HI_SUBS_PEND 3 /* Subscribed and messages pending. */ 400 401 /* External APIs */ 402 403 void hi_hit_init(struct hi_thr* hit); 404 struct hiios* hi_new_shuffler(struct hi_thr* hit, int nfd, int npdu, int nch, int nthr); 405 struct hi_io* hi_open_listener(struct hiios* shf, struct hi_host_spec* hs, int proto); 406 struct hi_io* hi_open_tcp(struct hi_thr* hit, struct hi_host_spec* hs, int proto); 407 struct hi_io* hi_add_fd(struct hi_thr* hit, struct hi_io* io, int fd, int kind); 408 void hi_del_fd(struct hi_thr* hit, int fd); 409 int hi_vfy_peer_ssl_cred(struct hi_thr* hit, struct hi_io* io, const char* eid); 410 411 struct hi_pdu* hi_pdu_alloc(struct hi_thr* hit, const char* lk); 412 void hi_send(struct hi_thr* hit, struct hi_io* io, struct hi_pdu* parent, struct hi_pdu* req, struct hi_pdu* resp); 413 void hi_send1(struct hi_thr* hit, struct hi_io* io, struct hi_pdu* parent, struct hi_pdu* req, struct hi_pdu* resp, int len0, char* d0); 414 void hi_send2(struct hi_thr* hit, struct hi_io* io, struct hi_pdu* parent, struct hi_pdu* req, struct hi_pdu* resp, int len0, char* d0, int len1, char* d1); 415 void hi_send3(struct hi_thr* hit, struct hi_io* io, struct hi_pdu* parent, struct hi_pdu* req, struct hi_pdu* resp, int len0, char* d0, int len1, char* d1, int len2, char* d2); 416 void hi_sendf(struct hi_thr* hit, struct hi_io* io, struct hi_pdu* parent, struct hi_pdu* req, char* fmt, ...); 417 void hi_todo_produce(struct hi_thr* hit, struct hi_qel* qe, const char* lk, int from_poll); 418 void hi_shuffle(struct hi_thr* hit, struct hiios* shf); 419 420 /* Internal APIs */ 421 422 #define HI_NOERR 0 423 #define HI_CONN_CLOSE 1 424 #define HI_NEED_MORE 2 425 426 void hi_in_out(struct hi_thr* hit, struct hi_io* io); 427 void hi_close(struct hi_thr* hit, struct hi_io* io, const char* lk); 428 int hi_write(struct hi_thr* hit, struct hi_io* io); 429 int hi_read(struct hi_thr* hit, struct hi_io* io); 430 void hi_accept(struct hi_thr* hit, struct hi_io* listener); 431 void hi_accept_book(struct hi_thr* hit, struct hi_io* io, int fd); 432 void hi_poll(struct hi_thr* hit); 433 434 struct hi_qel* hi_todo_consume(struct hi_thr* hit); 435 436 void hi_free_resp(struct hi_thr* hit, struct hi_pdu* resp, const char* lk1); 437 void hi_free_req(struct hi_thr* hit, struct hi_pdu* pdu, const char* lk1); 438 void hi_del_from_reqs(struct hi_io* io, struct hi_pdu* req); 439 void hi_add_to_reqs(struct hi_thr* hit, struct hi_io* io, struct hi_pdu* req, int minlen); 440 void hi_make_iov_nolock(struct hi_io* io); 441 442 /* Sanity checking and data structure dumping for debugging. */ 443 444 extern short hi_color; /* color used for data structure circularity checks */ 445 446 int hi_dump(struct hiios* shf); 447 int hi_sanity_pdu(int mode, struct hi_pdu* root_pdu); 448 int hi_sanity_io(int mode, struct hi_io* root_io); 449 int hi_sanity_hit(int mode, struct hi_thr* root_hit); 450 int hi_sanity_shf(int mode, struct hiios* root_shf); 451 int hi_sanity(int mode, struct hiios* root_shf, struct hi_thr* root_hit, const char* fn, int line); 452 453 #define HI_SANITY(shf, hit) if (errmac_debug>2) hi_sanity(255, (shf), (hit), __FUNCTION__, __LINE__) 454 #define DHI_SANITY(shf, hit) /* disabled */ 455 456 #endif /* _hiios_h */ 457