1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #ifndef _NXT_CONN_H_INCLUDED_
8 #define _NXT_CONN_H_INCLUDED_
9 
10 
11 typedef ssize_t (*nxt_conn_io_read_t)(nxt_task_t *task, nxt_conn_t *c);
12 typedef nxt_msec_t (*nxt_conn_timer_value_t)(nxt_conn_t *c, uintptr_t data);
13 
14 
15 typedef struct {
16     nxt_work_handler_t            ready_handler;
17     nxt_work_handler_t            close_handler;
18     nxt_work_handler_t            error_handler;
19 
20     nxt_conn_io_read_t            io_read_handler;
21 
22     nxt_work_handler_t            timer_handler;
23     nxt_conn_timer_value_t        timer_value;
24     uintptr_t                     timer_data;
25 
26     uint8_t                       timer_autoreset;
27 } nxt_conn_state_t;
28 
29 
30 typedef struct {
31     double                        average;
32     size_t                        limit;
33     size_t                        limit_after;
34     size_t                        max_limit;
35     nxt_msec_t                    last;
36 } nxt_event_write_rate_t;
37 
38 
39 typedef struct {
40 
41     nxt_work_handler_t            connect;
42     nxt_work_handler_t            accept;
43 
44     /*
45      * The read() with NULL c->read buffer waits readiness of a connection
46      * to avoid allocation of read buffer if the connection will time out
47      * or will be closed with error.  The kqueue-specific read() can also
48      * detect case if a client did not sent anything and has just closed the
49      * connection without errors.  In the latter case state's close_handler
50      * is called.
51      */
52     nxt_work_handler_t            read;
53 
54     ssize_t                       (*recvbuf)(nxt_conn_t *c, nxt_buf_t *b);
55 
56     ssize_t                       (*recv)(nxt_conn_t *c, void *buf,
57                                       size_t size, nxt_uint_t flags);
58 
59     /* The write() is an interface to write a buffer chain. */
60     nxt_work_handler_t            write;
61 
62     /*
63      * The sendbuf() is an interface for OS-specific sendfile
64      * implementations or simple writev().
65      */
66     ssize_t                       (*sendbuf)(nxt_task_t *task,
67                                        nxt_sendbuf_t *sb);
68     /*
69      * The sendbuf() is an interface for OS-specific sendfile
70      * implementations or simple writev().
71      */
72     ssize_t                       (*old_sendbuf)(nxt_conn_t *c, nxt_buf_t *b,
73                                       size_t limit);
74     /*
75      * The writev() is an interface to write several nxt_iobuf_t buffers.
76      */
77     ssize_t                       (*writev)(nxt_conn_t *c,
78                                       nxt_iobuf_t *iob, nxt_uint_t niob);
79     /*
80      * The send() is an interface to write a single buffer.  SSL/TLS
81      * libraries' send() interface handles also the libraries' errors.
82      */
83     ssize_t                       (*send)(nxt_conn_t *c, void *buf,
84                                       size_t size);
85 
86     nxt_work_handler_t            shutdown;
87 } nxt_conn_io_t;
88 
89 
90 /*
91  * The nxt_listen_event_t is separated from nxt_listen_socket_t
92  * because nxt_listen_socket_t is one per process whilst each worker
93  * thread uses own nxt_listen_event_t.
94  */
95 typedef struct {
96     /* Must be the first field. */
97     nxt_fd_event_t                socket;
98 
99     nxt_task_t                    task;
100 
101     uint32_t                      ready;
102     uint32_t                      batch;
103     uint32_t                      count;
104 
105     /* An accept() interface is cached to minimize memory accesses. */
106     nxt_work_handler_t            accept;
107 
108     nxt_listen_socket_t           *listen;
109     nxt_conn_t                    *next;
110     nxt_work_queue_t              *work_queue;
111 
112     nxt_timer_t                   timer;
113 
114     nxt_queue_link_t              link;
115 } nxt_listen_event_t;
116 
117 
118 struct nxt_conn_s {
119     /*
120      * Must be the first field, since nxt_fd_event_t
121      * and nxt_conn_t are used interchangeably.
122      */
123     nxt_fd_event_t                socket;
124 
125     nxt_buf_t                     *read;
126     const nxt_conn_state_t        *read_state;
127     nxt_work_queue_t              *read_work_queue;
128     nxt_timer_t                   read_timer;
129 
130     nxt_buf_t                     *write;
131     const nxt_conn_state_t        *write_state;
132     nxt_work_queue_t              *write_work_queue;
133     nxt_event_write_rate_t        *rate;
134     nxt_timer_t                   write_timer;
135 
136     nxt_off_t                     sent;
137     uint32_t                      max_chunk;
138     uint32_t                      nbytes;
139 
140     nxt_conn_io_t                 *io;
141 
142     union {
143 #if (NXT_TLS)
144         void                      *tls;
145 #endif
146         nxt_thread_pool_t         *thread_pool;
147     } u;
148 
149     nxt_mp_t                      *mem_pool;
150 
151     nxt_task_t                    task;
152     nxt_log_t                     log;
153 
154     nxt_listen_event_t            *listen;
155 
156     nxt_sockaddr_t                *remote;
157     nxt_sockaddr_t                *local;
158     const char                    *action;
159 
160     uint8_t                       block_read;   /* 1 bit */
161     uint8_t                       block_write;  /* 1 bit */
162     uint8_t                       delayed;      /* 1 bit */
163 
164 #define NXT_CONN_SENDFILE_OFF     0
165 #define NXT_CONN_SENDFILE_ON      1
166 #define NXT_CONN_SENDFILE_UNSET   3
167 
168     uint8_t                       sendfile;     /* 2 bits */
169     uint8_t                       tcp_nodelay;  /* 1 bit */
170 
171     nxt_queue_link_t              link;
172 };
173 
174 
175 #define nxt_conn_timer_init(ev, c, wq)                                        \
176     do {                                                                      \
177         (ev)->work_queue = (wq);                                              \
178         (ev)->log = &(c)->log;                                                \
179         (ev)->bias = NXT_TIMER_DEFAULT_BIAS;                                  \
180     } while (0)
181 
182 
183 #define nxt_read_timer_conn(ev)                                               \
184     nxt_timer_data(ev, nxt_conn_t, read_timer)
185 
186 
187 #define nxt_write_timer_conn(ev)                                              \
188     nxt_timer_data(ev, nxt_conn_t, write_timer)
189 
190 
191 #if (NXT_HAVE_UNIX_DOMAIN)
192 
193 #define nxt_conn_tcp_nodelay_on(task, c)                                      \
194     do {                                                                      \
195         nxt_int_t  ret;                                                       \
196                                                                               \
197         if ((c)->remote->u.sockaddr.sa_family != AF_UNIX) {                   \
198             ret = nxt_socket_setsockopt(task, (c)->socket.fd, IPPROTO_TCP,    \
199                                         TCP_NODELAY, 1);                      \
200                                                                               \
201             (c)->tcp_nodelay = (ret == NXT_OK);                               \
202         }                                                                     \
203     } while (0)
204 
205 
206 #else
207 
208 #define nxt_conn_tcp_nodelay_on(task, c)                                      \
209     do {                                                                      \
210         nxt_int_t  ret;                                                       \
211                                                                               \
212         ret = nxt_socket_setsockopt(task, (c)->socket.fd, IPPROTO_TCP,        \
213                                     TCP_NODELAY, 1);                          \
214                                                                               \
215         (c)->tcp_nodelay = (ret == NXT_OK);                                   \
216     } while (0)
217 
218 #endif
219 
220 
221 NXT_EXPORT nxt_conn_t *nxt_conn_create(nxt_mp_t *mp, nxt_task_t *task);
222 NXT_EXPORT void nxt_conn_free(nxt_task_t *task, nxt_conn_t *c);
223 NXT_EXPORT void nxt_conn_close(nxt_event_engine_t *engine, nxt_conn_t *c);
224 
225 NXT_EXPORT void nxt_conn_timer(nxt_event_engine_t *engine, nxt_conn_t *c,
226     const nxt_conn_state_t *state, nxt_timer_t *tev);
227 NXT_EXPORT void nxt_conn_work_queue_set(nxt_conn_t *c, nxt_work_queue_t *wq);
228 NXT_EXPORT nxt_sockaddr_t *nxt_conn_local_addr(nxt_task_t *task,
229     nxt_conn_t *c);
230 
231 void nxt_conn_sys_socket(nxt_task_t *task, void *obj, void *data);
232 void nxt_conn_io_connect(nxt_task_t *task, void *obj, void *data);
233 nxt_int_t nxt_conn_socket(nxt_task_t *task, nxt_conn_t *c);
234 void nxt_conn_connect_test(nxt_task_t *task, void *obj, void *data);
235 void nxt_conn_connect_error(nxt_task_t *task, void *obj, void *data);
236 
237 NXT_EXPORT nxt_listen_event_t *nxt_listen_event(nxt_task_t *task,
238     nxt_listen_socket_t *ls);
239 void nxt_conn_io_accept(nxt_task_t *task, void *obj, void *data);
240 NXT_EXPORT void nxt_conn_accept(nxt_task_t *task, nxt_listen_event_t *lev,
241     nxt_conn_t *c);
242 void nxt_conn_accept_error(nxt_task_t *task, nxt_listen_event_t *lev,
243     const char *accept_syscall, nxt_err_t err);
244 
245 void nxt_conn_wait(nxt_conn_t *c);
246 
247 void nxt_conn_io_read(nxt_task_t *task, void *obj, void *data);
248 ssize_t nxt_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b);
249 ssize_t nxt_conn_io_recv(nxt_conn_t *c, void *buf, size_t size,
250     nxt_uint_t flags);
251 
252 void nxt_conn_io_write(nxt_task_t *task, void *obj, void *data);
253 ssize_t nxt_conn_io_sendbuf(nxt_task_t *task, nxt_sendbuf_t *sb);
254 ssize_t nxt_conn_io_writev(nxt_task_t *task, nxt_sendbuf_t *sb,
255     nxt_iobuf_t *iob, nxt_uint_t niob);
256 ssize_t nxt_conn_io_send(nxt_task_t *task, nxt_sendbuf_t *sb, void *buf,
257     size_t size);
258 
259 size_t nxt_event_conn_write_limit(nxt_conn_t *c);
260 nxt_bool_t nxt_event_conn_write_delayed(nxt_event_engine_t *engine,
261     nxt_conn_t *c, size_t sent);
262 ssize_t nxt_event_conn_io_writev(nxt_conn_t *c, nxt_iobuf_t *iob,
263     nxt_uint_t niob);
264 ssize_t nxt_event_conn_io_send(nxt_conn_t *c, void *buf, size_t size);
265 
266 NXT_EXPORT void nxt_event_conn_job_sendfile(nxt_task_t *task,
267     nxt_conn_t *c);
268 
269 
270 #define nxt_conn_connect(engine, c)                                           \
271     nxt_work_queue_add(&engine->socket_work_queue, nxt_conn_sys_socket,       \
272                        c->socket.task, c, c->socket.data)
273 
274 
275 #define nxt_conn_read(engine, c)                                              \
276     do {                                                                      \
277         nxt_event_engine_t  *e = engine;                                      \
278                                                                               \
279         c->socket.read_work_queue = &e->read_work_queue;                      \
280                                                                               \
281         nxt_work_queue_add(&e->read_work_queue, c->io->read,                  \
282                            c->socket.task, c, c->socket.data);                \
283     } while (0)
284 
285 
286 #define nxt_conn_write(engine, c)                                             \
287     do {                                                                      \
288         nxt_event_engine_t  *e = engine;                                      \
289                                                                               \
290         c->socket.write_work_queue = &e->write_work_queue;                    \
291                                                                               \
292         nxt_work_queue_add(&e->write_work_queue, c->io->write,                \
293                            c->socket.task, c, c->socket.data);                \
294     } while (0)
295 
296 
297 extern nxt_conn_io_t             nxt_unix_conn_io;
298 
299 
300 typedef struct {
301     /*
302      * Client and peer connections are not embedded because already
303      * existent connections can be switched to the event connection proxy.
304      */
305     nxt_conn_t                   *client;
306     nxt_conn_t                   *peer;
307     nxt_buf_t                    *client_buffer;
308     nxt_buf_t                    *peer_buffer;
309 
310     size_t                       client_buffer_size;
311     size_t                       peer_buffer_size;
312 
313     nxt_msec_t                   client_wait_timeout;
314     nxt_msec_t                   connect_timeout;
315     nxt_msec_t                   reconnect_timeout;
316     nxt_msec_t                   peer_wait_timeout;
317     nxt_msec_t                   client_write_timeout;
318     nxt_msec_t                   peer_write_timeout;
319 
320     uint8_t                      connected;  /* 1 bit */
321     uint8_t                      delayed;    /* 1 bit */
322     uint8_t                      retries;    /* 8 bits */
323     uint8_t                      retain;     /* 2 bits */
324 
325     nxt_work_handler_t           completion_handler;
326 } nxt_conn_proxy_t;
327 
328 
329 NXT_EXPORT nxt_conn_proxy_t *nxt_conn_proxy_create(nxt_conn_t *c);
330 NXT_EXPORT void nxt_conn_proxy(nxt_task_t *task, nxt_conn_proxy_t *p);
331 
332 
333 /* STUB */
334 #define nxt_event_conn_t         nxt_conn_t
335 #define nxt_event_conn_state_t   nxt_conn_state_t
336 #define nxt_event_conn_proxy_t   nxt_conn_proxy_t
337 #define nxt_event_conn_read      nxt_conn_read
338 #define nxt_event_conn_write     nxt_conn_write
339 #define nxt_event_conn_close     nxt_conn_close
340 
341 
342 #endif /* _NXT_CONN_H_INCLUDED_ */
343