1 /* hiios.h  -  Hiquu I/O Engine
2  * Copyright (c) 2006,2012 Sampo Kellomaki (sampo@iki.fi), All Rights Reserved.
3  * This is confidential unpublished proprietary source code of the author.
4  * NO WARRANTY, not even implied warranties. Contains trade secrets.
5  * Distribution prohibited unless authorized in writing. See file COPYING.
6  * Special grant: hiios.h may be used with zxid open source project under
7  * same licensing terms as zxid itself.
8  * $Id$
9  *
10  * 15.4.2006, created over Easter holiday --Sampo
11  * 23.4.2006, DTS specific enhancements --Sampo
12  * 16.8.2012, modified license grant to allow use with ZXID.org --Sampo
13  *
14  * A shuffler (hiios) is the top most global object, containing all
15  * the connection objects and original global PDU memory pool.
16  * Each thread has a shuffler, but also a local pool of PDU memory
17  * that can be accessed without locking.
18  *
19  * PDU is always somehow reachable through pdu->qel.n (next) pointer, but
20  * membership in queue is mutually exclusive as follows
21  * 1. shf->free_pdus    -- global memalloc pool (shf->pdus is backing store)
22  * 2. hit->free_pdus    -- per thread free list, for allocation within a thread
23  * 3. shf->todo_consume -- The todo list, marking that some polling needs to be done
24  *
25  * PDU also has its own next pointer, the pdu->n which is used to keep lists of active PDUs
26  * A. io->reqs        -- linked list of real reqs of this session, protect by qel.mut
27  * B. io->cur_pdu     -- not really a list, but PDU can be here after allocation and before reqs
28  * C. pdu->reals      -- linked list of real resps to this req
29  * D. pdu->synths     -- linked list of subreqs and synth resps (not yet used as of 2012)
30  *
31  * Additionally a PDU may participate in various write related queues using wn (write next) pointer
32  * i.   io->to_write_produce  -- add new pdus here (main thr only)
33  * ii.  io->to_write_consume  -- list of PDUs that are imminently going to be written
34  * iii. io->in_write          -- list of pdus that are in process of being written (have iovs)
35  * iv.  io->subresps          -- subreq: list of resps, to ds_wait() upon
36  *
37  * The PDUs in the to_write queue have pdu->wn pointing from consume towards produce:
38  *
39  * to_write_produce --> pdu3      pdu2      pdu1 <-- to_write_consume
40  *                      wn:0 <----wn   <----wn
41  *
42  * Here pdu1 was inserted first, then pdu2, etc. Inserts happen at produce
43  * end (and set the wn pointer of the previous head to point to new head)
44  * and removals at consumer end (by chasing the wn pointer). Here's queue with one element
45  *
46  * to_write_produce --> pdu1 <---------------------- to_write_consume
47  *                      wn:0
48  *
49  * Empty queue is expressed by both pointer being null.
50  *
51  * to_write_produce:0                                to_write_consume:0
52  *
53  * Threading strategy:
54  *
55  * An I/O object needs to "belong" to single thread for duration of a
56  * polled I/O activity: if a thread responsd to poll for read, it needs
57  * to maintain control of the io object until it has read and decoded
58  * the PDU to a point where it moves from io->cur_pdu to io->reqs. After
59  * this another thread may be enabled to read another PDU from the
60  * socket.
61  *
62  * Alternatively, if the decode state of a PDU is stored in a thread
63  * safe way, the current thread may relinguish control when it sets pdu->need
64  * to indicate that further data needs to be read. At this point, some other
65  * thread may actually perform the additional read and finish the decoding.
66  *
67  * In general, after decoding PDU, the thread should not hang on to
68  * the I/O object even if it continues to perform the payload function
69  * of the PDU, as a worker thread.  Thus same thread is expected to
70  * transform from an I/O thread to worker thread on the fly. If the
71  * payload processing produces response or subrequest, in principle
72  * the additional processing can go through the poll, but there does
73  * not seem to be much harm in "short circuiting" this process by
74  * having same worker thread assume the I/O thread role for sending
75  * the response or subrequest, provided that the destination I/O
76  * object is not occupied by another thread. If it is, the short
77  * circuiting can not be done and the response or subrequest must go
78  * through write poll.
79  *
80  * The basic mechanism to avoid other thread squatting on I/O object is to
81  * ensure that it is not scheduled to poll or todo list while exclusive
82  * access is desired.
83  *
84  * Lock ordering
85  * 1. shf->todo_mut
86  * 2. shf->todo_cond
87  * 3. io->qel.mut
88  * 1. io->qel.mut    e.g. hi_close() calling hi_pdu_alloc() or hi_pdu_free()
89  * 2. shf->pdu_mut
90  * 1. shf->ent_mut
91  * 2. io->qel.mut
92  *
93  * io->reading and io->writing flags are used to ensure that only single
94  * thread will be doing the I/O at a time (one thread for read, other for
95  * write is allowed). In addition to this, we need to ensure that
96  * fd is not completely closed while any thread may still be
97  * using it. This is accomplished using the n_thr counter.
98  *
99  * n_thr is incremented in hi_todo_consume() or upon short circuit
100  * write. It is decremented either on end of write, end of read,
101  * or in hi_in_out() if nothing was done.
102  *
103  * See http://pl.atyp.us/content/tech/servers.html for inspiration on threading strategy.
104  * http://www.kegel.com/c10k.html
105  */
106 
107 #ifndef _hiios_h
108 #define _hiios_h
109 
110 #ifdef LINUX
111 #include <sys/epoll.h>     /* See man 4 epoll (Linux 2.6) */
112 #endif
113 #ifdef SUNOS
114 #include <sys/devpoll.h>   /* See man -s 7d poll (Solaris 8) */
115 #include <sys/poll.h>
116 #endif
117 #ifdef USE_OPENSSL
118 #include <openssl/ssl.h>
119 #endif
120 
121 #include <netinet/in.h>
122 #include <sys/uio.h>
123 #include <pthread.h>
124 
125 #include "hiproto.h"
126 
127 struct hi_lock {
128   pthread_mutex_t ptmut;
129   const char* func;        /* Remember where we locked to ease debugging. */
130   int line;
131   pthread_t thr;
132 };
133 
134 #ifndef IOV_MAX
135 #define IOV_MAX 16
136 #endif
137 #define HI_N_IOV (IOV_MAX < 32 ? IOV_MAX : 32)   /* Avoid unreasonably huge iov */
138 #if 0
139 #define HI_PDU_MEM 2200 /* Default PDU memory buffer size, sufficient for reliable data */
140 #define HI_PDU_MEM 4200 /* Default PDU memory buffer size, sufficient for broadcast data */
141 #endif
142 #define HI_PDU_MEM 3072 /* Default PDU memory buffer size, for log lines */
143 
144 /* qel.kind constants */
145 #define HI_POLLT    1   /* Trigger epoll */
146 #define HI_LISTENT  2   /* Listening socket for TCP */
147 #define HI_HALF_ACCEPT 3 /* Accepted at TCP, but delayed booking due to threads expecting old dead connection. */
148 #define HI_TCP_S    4   /* TCP server socket, i.e. accept(2)'d from listening socket */
149 #define HI_TCP_C    5   /* TCP client socket, i.e. formed using connect(2) */
150 #define HI_SNMP     6   /* SNMP (UDP) socket */
151 #define HI_PDU_DIST 7   /* PDU with intent to deliver STOMP message */
152 
153 /* qel.intodo constants */
154 #define HI_INTODO_SHF_FREE 0 /* in shuffler free queue (PDU or IO) */
155 #define HI_INTODO_HIT_FREE 1 /* in thread free queue */
156 #define HI_INTODO_INTODO   2 /* intodo queue */
157 #define HI_INTODO_IOINUSE  3 /* IO in use */
158 #define HI_INTODO_PDUINUSE 4 /* PDU in use */
159 
160 #define HI_IO_N_THR_END_GAME (-3)  /* Special io->n_thr value to indicate close end game. */
161 #define HI_IO_N_THR_END_POLL (-7)  /* Special io->n_thr value to indicate close end game. */
162 
163 struct hi_qel {         /* hiios task queue element. This is the 1st thing on io and pdu objects */
164   struct hi_qel* n;     /* Next in todo_queue for IOs or in free_pdus. */
165   struct hi_lock mut;
166   char kind;
167   char proto;           /* See HIPROTO_* constants */
168   char intodo;          /* Flag indicating object (io or pdu) is in shf->todo_consume queue */
169   char pad3;
170 };
171 
172 /*(s) Connection object */
173 
174 struct hi_io {
175   struct hi_qel qel;         /* Next in todo_queue for IOs or in free_pdus. */
176   struct hi_io* n;           /* next among io objects, esp. backends */
177   struct hi_io* pair;        /* the other half of a proxy connection */
178   struct hi_ent* ent;        /* Login entity associated with connection */
179   int fd;                    /* file descriptor (socket), or 0x80000000 flag if not in use */
180   char events;               /* events from last poll */
181   char n_iov;
182   char writing;              /* Flag, protected by io->qel.mut, that indicates that some thread is processing a write on the io object. */
183   char reading;              /* Flag, protected by io->qel.mut, that indicates that some thread is processing a read on the io object. */
184   struct iovec* iov_cur;     /* not used by listeners, only used for writev by sessions and backend ses */
185   struct iovec iov[HI_N_IOV];
186   int n_thr;                 /* num threads using this io, lock io->qel.mut */
187   int n_to_write;            /* length of to_write queue */
188   struct hi_pdu* in_write;   /* wn list of pdus that are in process of being written (have iovs) */
189   struct hi_pdu* to_write_consume;  /* wn list of PDUs that are imminently going to be written */
190   struct hi_pdu* to_write_produce;  /* wn add new pdus here (main thr only) */
191 
192   /* Statistics counters */
193   int n_close;               /* Number of closes. Generation counter. */
194   int n_written;             /* bytes */
195   int n_read;                /* bytes */
196   int n_pdu_out;
197   int n_pdu_in;
198 
199   struct hi_pdu* cur_pdu;    /* PDU for which we currently expect to do read I/O */
200   struct hi_pdu* reqs;       /* n linked list of real reqs of this session, protect by qel.mut */
201   struct hi_pdu* pending;    /* n linked list of requests sent to client and pending response, protect by qel.mut */
202   union {
203     struct dts_conn* dts;
204     int sap;                 /* S5066 SAP ID, indexes into saptab[] and svc_type_tab[] */
205     struct {
206       struct hi_pdu* uni_ind_hmtp;
207       int state;
208     } smtp;
209     struct {
210       int msgid;
211     } stomp;
212   } ad;                      /* Application specific data */
213 #ifdef USE_OPENSSL
214   SSL* ssl;
215 #endif
216 };
217 
218 struct hi_ad_stomp {
219   int len;               /* Populated from content-length header, if one is supplied. */
220   char* body;            /* Body of the message */
221   char* dest;            /* destination, also heart_bt, zx_rcpt_sig */
222   char* host;            /* also receipt and receipt_id */
223   char* vers;            /* version, also accept-version, tx_id */
224   char* login;           /* also session, subs_id, subsc */
225   char* pw;              /* also server, ack, msg_id */
226 };
227 
228 /*(s) PDU object */
229 
230 struct hi_pdu {
231   struct hi_qel qel;
232   struct hi_pdu* n;          /* Next among requests or responses */
233   struct hi_pdu* wn;         /* Write next. Used by in_write, to_write, and subresps queues. */
234   struct hi_io* fe;          /* Frontend of the PDU, e.g. where req was read from. */
235 
236   struct hi_pdu* req;        /* Set for response to indicate which request it is response to. */
237   struct hi_pdu* parent;     /* Set for sub-requests and -responses */
238 
239   struct hi_pdu* subresps;   /* wn subreq: list of resps, to ds_wait() upon */
240   struct hi_pdu* reals;      /* pdu->n linked list of real resps to this req */
241   struct hi_pdu* synths;     /* pdu->n linked list of subreqs and synth resps */
242 
243   short color;               /* Coloring flag for integrity tests, e.g. to detect circular ptrs */
244   char events;               /* events needed by this PDU (EPOLLIN or EPOLLOUT) */
245   char n_iov;
246   struct iovec iov[3];       /* Enough for header, payload, and CRC */
247 
248   int need;                  /* How much more is needed to complete a PDU? Also final length. */
249   char* scan;                /* How far has protocol parsin progressed, e.g. in SMTP. */
250   char* ap;                  /* Allocation pointer: next free memory location */
251   char* m;                   /* Beginning of memory (often m == mem, but could be malloc'd) */
252   char* lim;                 /* One past end of memory */
253   char mem[HI_PDU_MEM];      /* Memory for processing a PDU */
254 
255   union {
256 #ifdef ENA_S5066
257     struct {
258       int n_tx_seq;          /* Transmit Frame Sequence Number */
259       int addr_len;
260       char* c_pdu;           /* S5066 DTS segmented C_PDU */
261     } dts;
262     struct {
263       char rx_map[SIS_MAX_PDU_SIZE/8];  /* bitmap of bytes rx'd so we know if we have rx'd all */
264     } dtsrx;
265 #endif
266     struct {
267       char* skip_ehlo;
268     } smtp;
269     struct hi_ad_stomp stomp;
270     struct {
271       int len;               /* Body length. */
272       char* body;            /* Body of the message */
273       char* dest;            /* destination, also heart_bt */
274       int ack_fd;            /* File where acks are collected. */
275       int acks;              /* Ack counter for delivery. */
276       int nacks;             /* Nack counter: incontactables for delivery. */
277     } delivb;
278   } ad;                      /* Application specific data */
279 };
280 
281 #if 0
282 struct c_pdu_buf;
283 #endif
284 
285 /*(s) Main shuffler object.
286  * Principal function is to hold epoll_event array and todo list.
287  * Secondary function is to be memory pool of last resort. */
288 
289 struct hiios {
290   int ep;       /* epoll(4) (Linux 2.6) or /dev/poll (Solaris 8, man -s 7d poll) file descriptor */
291   int n_evs;    /* how many useful events last epoll_wait() returned */
292   int max_evs;
293 #ifdef LINUX
294   struct epoll_event* evs;
295 #endif
296 #ifdef SUNOS
297   struct pollfd* evs;
298 #endif
299 #if defined(MACOSX) || defined(FREEBSD)
300   struct kevent* evs;
301 #endif
302   //int n_ios;
303   int max_ios;                  /* Size of ios array = maximum number of fds */
304   struct hi_io* ios;            /* Dynamically allocated array of io objects, one per fd. */
305 
306   struct hi_lock pdu_mut;
307   int max_pdus;
308   struct hi_pdu* pdu_buf_blob;  /* Backingstore for the PDU pool (big blob) */
309   struct hi_pdu* free_pdus;     /* Global pool of PDUs (linked list) */
310 
311 #if 0
312   struct hi_lock c_pdu_buf_mut;
313   int max_c_pdu_bufs;
314   struct c_pdu_buf* c_pdu_bufs; /* global pool for c_pdu buffers */
315   struct c_pdu_buf* free_c_pdu_bufs;
316 #endif
317 
318   struct hi_lock todo_mut;
319   pthread_cond_t todo_cond;
320   struct hi_qel* todo_consume;  /* PDUs and I/O objects that need processing */
321   struct hi_qel* todo_produce;
322   int n_todo;
323   struct hi_qel poll_tok;       /* Special qel to be inserted in todo_consume to trigger poll. */
324 
325   int nthr;                     /* Number of threads referencing this shf */
326   struct hi_thr* threads;       /* List of threads. */
327   struct hi_lock ent_mut;
328   struct hi_ent* ents;          /* List of subscribing entities */
329   int max_chs;                  /* Maximum number of channels */
330   struct hi_ch* chs;            /* Array of channels */
331 
332   char anonlogin;               /* Config: whether anonymous login is ok. */
333   char res1;
334   char res2;
335   char res3;
336 #ifdef USE_OPENSSL
337   SSL_CTX* ssl_ctx;
338 #endif
339 };
340 
341 /*(s) Thread object */
342 
343 struct hi_thr {
344   struct hi_thr* n;
345   struct hiios* shf;
346   struct hi_io* cur_io;         /* Only valid for HI_TCP_S and HI_TCP_C */
347   int cur_n_close;              /* Generation value of the current io */
348   int n_free_pdus;
349   struct hi_pdu* free_pdus;     /* Per thread pool of PDUs */
350 #if 0
351   struct c_pdu_buf* free_c_pdu_bufs;
352 #endif
353   pthread_t self;
354 };
355 
356 struct hi_host_spec {
357   struct hi_host_spec* next;
358   struct sockaddr_in sin;
359   int proto;
360   char* specstr;
361   struct hi_io* conns;
362 };
363 
364 struct hi_proto {
365   char name[8];
366   int default_port;
367   int is_tls;
368   struct hi_host_spec* specs;
369 };
370 
371 extern struct hi_proto hi_prototab[];
372 
373 /*(s) Channel or destination designation object */
374 
375 struct hi_ch {
376   char* dest;
377 };
378 
379 /*(s) Node for linked list of PDUs acknowledged by the entity */
380 
381 struct hi_ack {
382   struct hi_ack* n;
383   struct hi_pdu* pdu;
384 };
385 
386 /*(s) Entity or subscriber object. Typically loaded from /var/bus/.ents */
387 
388 struct hi_ent {
389   struct hi_ent* n;
390   char* eid;           /* EntityID as seen in STOMP 1.1 login header */
391   struct hi_io* io;
392   char* chs;           /* Subscribed channels as an array of char */
393   struct hi_ack* acks;
394 };
395 
396 #define HI_NOSUBS    0
397 #define HI_SUBS      1 /* Subscribed, but not logged in. */
398 #define HI_SUBS_ON   2 /* Subscribed and logged in with subscribe message */
399 #define HI_SUBS_PEND 3 /* Subscribed and messages pending. */
400 
401 /* External APIs */
402 
403 void hi_hit_init(struct hi_thr* hit);
404 struct hiios* hi_new_shuffler(struct hi_thr* hit, int nfd, int npdu, int nch, int nthr);
405 struct hi_io* hi_open_listener(struct hiios* shf, struct hi_host_spec* hs, int proto);
406 struct hi_io* hi_open_tcp(struct hi_thr* hit, struct hi_host_spec* hs, int proto);
407 struct hi_io* hi_add_fd(struct hi_thr* hit, struct hi_io* io, int fd, int kind);
408 void hi_del_fd(struct hi_thr* hit, int fd);
409 int hi_vfy_peer_ssl_cred(struct hi_thr* hit, struct hi_io* io, const char* eid);
410 
411 struct hi_pdu* hi_pdu_alloc(struct hi_thr* hit, const char* lk);
412 void hi_send(struct hi_thr* hit, struct hi_io* io, struct hi_pdu* parent, struct hi_pdu* req, struct hi_pdu* resp);
413 void hi_send1(struct hi_thr* hit, struct hi_io* io, struct hi_pdu* parent, struct hi_pdu* req, struct hi_pdu* resp, int len0, char* d0);
414 void hi_send2(struct hi_thr* hit, struct hi_io* io, struct hi_pdu* parent, struct hi_pdu* req, struct hi_pdu* resp, int len0, char* d0, int len1, char* d1);
415 void hi_send3(struct hi_thr* hit, struct hi_io* io, struct hi_pdu* parent, struct hi_pdu* req, struct hi_pdu* resp, int len0, char* d0, int len1, char* d1, int len2, char* d2);
416 void hi_sendf(struct hi_thr* hit, struct hi_io* io, struct hi_pdu* parent, struct hi_pdu* req, char* fmt, ...);
417 void hi_todo_produce(struct hi_thr* hit, struct hi_qel* qe, const char* lk, int from_poll);
418 void hi_shuffle(struct hi_thr* hit, struct hiios* shf);
419 
420 /* Internal APIs */
421 
422 #define HI_NOERR      0
423 #define HI_CONN_CLOSE 1
424 #define HI_NEED_MORE  2
425 
426 void hi_in_out(struct hi_thr* hit, struct hi_io* io);
427 void hi_close(struct hi_thr* hit, struct hi_io* io, const char* lk);
428 int  hi_write(struct hi_thr* hit, struct hi_io* io);
429 int  hi_read(struct hi_thr* hit, struct hi_io* io);
430 void hi_accept(struct hi_thr* hit, struct hi_io* listener);
431 void hi_accept_book(struct hi_thr* hit, struct hi_io* io, int fd);
432 void hi_poll(struct hi_thr* hit);
433 
434 struct hi_qel* hi_todo_consume(struct hi_thr* hit);
435 
436 void hi_free_resp(struct hi_thr* hit, struct hi_pdu* resp, const char* lk1);
437 void hi_free_req(struct hi_thr* hit, struct hi_pdu* pdu, const char* lk1);
438 void hi_del_from_reqs(struct hi_io* io,   struct hi_pdu* req);
439 void hi_add_to_reqs(struct hi_thr* hit, struct hi_io* io, struct hi_pdu* req, int minlen);
440 void hi_make_iov_nolock(struct hi_io* io);
441 
442 /* Sanity checking and data structure dumping for debugging. */
443 
444 extern short hi_color;  /* color used for data structure circularity checks */
445 
446 int hi_dump(struct hiios* shf);
447 int hi_sanity_pdu(int mode, struct hi_pdu* root_pdu);
448 int hi_sanity_io(int mode, struct hi_io* root_io);
449 int hi_sanity_hit(int mode, struct hi_thr* root_hit);
450 int hi_sanity_shf(int mode, struct hiios* root_shf);
451 int hi_sanity(int mode, struct hiios* root_shf, struct hi_thr* root_hit, const char* fn, int line);
452 
453 #define HI_SANITY(shf, hit) if (errmac_debug>2) hi_sanity(255, (shf), (hit), __FUNCTION__, __LINE__)
454 #define DHI_SANITY(shf, hit) /* disabled */
455 
456 #endif /* _hiios_h */
457