1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #ifndef _NXT_CONN_H_INCLUDED_ 8 #define _NXT_CONN_H_INCLUDED_ 9 10 11 typedef ssize_t (*nxt_conn_io_read_t)(nxt_task_t *task, nxt_conn_t *c); 12 typedef nxt_msec_t (*nxt_conn_timer_value_t)(nxt_conn_t *c, uintptr_t data); 13 14 15 typedef struct { 16 nxt_work_handler_t ready_handler; 17 nxt_work_handler_t close_handler; 18 nxt_work_handler_t error_handler; 19 20 nxt_conn_io_read_t io_read_handler; 21 22 nxt_work_handler_t timer_handler; 23 nxt_conn_timer_value_t timer_value; 24 uintptr_t timer_data; 25 26 uint8_t timer_autoreset; 27 } nxt_conn_state_t; 28 29 30 typedef struct { 31 double average; 32 size_t limit; 33 size_t limit_after; 34 size_t max_limit; 35 nxt_msec_t last; 36 } nxt_event_write_rate_t; 37 38 39 typedef struct { 40 41 nxt_work_handler_t connect; 42 nxt_work_handler_t accept; 43 44 /* 45 * The read() with NULL c->read buffer waits readiness of a connection 46 * to avoid allocation of read buffer if the connection will time out 47 * or will be closed with error. The kqueue-specific read() can also 48 * detect case if a client did not sent anything and has just closed the 49 * connection without errors. In the latter case state's close_handler 50 * is called. 51 */ 52 nxt_work_handler_t read; 53 54 ssize_t (*recvbuf)(nxt_conn_t *c, nxt_buf_t *b); 55 56 ssize_t (*recv)(nxt_conn_t *c, void *buf, 57 size_t size, nxt_uint_t flags); 58 59 /* The write() is an interface to write a buffer chain. */ 60 nxt_work_handler_t write; 61 62 /* 63 * The sendbuf() is an interface for OS-specific sendfile 64 * implementations or simple writev(). 65 */ 66 ssize_t (*sendbuf)(nxt_task_t *task, 67 nxt_sendbuf_t *sb); 68 /* 69 * The sendbuf() is an interface for OS-specific sendfile 70 * implementations or simple writev(). 71 */ 72 ssize_t (*old_sendbuf)(nxt_conn_t *c, nxt_buf_t *b, 73 size_t limit); 74 /* 75 * The writev() is an interface to write several nxt_iobuf_t buffers. 76 */ 77 ssize_t (*writev)(nxt_conn_t *c, 78 nxt_iobuf_t *iob, nxt_uint_t niob); 79 /* 80 * The send() is an interface to write a single buffer. SSL/TLS 81 * libraries' send() interface handles also the libraries' errors. 82 */ 83 ssize_t (*send)(nxt_conn_t *c, void *buf, 84 size_t size); 85 86 nxt_work_handler_t shutdown; 87 } nxt_conn_io_t; 88 89 90 /* 91 * The nxt_listen_event_t is separated from nxt_listen_socket_t 92 * because nxt_listen_socket_t is one per process whilst each worker 93 * thread uses own nxt_listen_event_t. 94 */ 95 typedef struct { 96 /* Must be the first field. */ 97 nxt_fd_event_t socket; 98 99 nxt_task_t task; 100 101 uint32_t ready; 102 uint32_t batch; 103 uint32_t count; 104 105 /* An accept() interface is cached to minimize memory accesses. */ 106 nxt_work_handler_t accept; 107 108 nxt_listen_socket_t *listen; 109 nxt_conn_t *next; 110 nxt_work_queue_t *work_queue; 111 112 nxt_timer_t timer; 113 114 nxt_queue_link_t link; 115 } nxt_listen_event_t; 116 117 118 struct nxt_conn_s { 119 /* 120 * Must be the first field, since nxt_fd_event_t 121 * and nxt_conn_t are used interchangeably. 122 */ 123 nxt_fd_event_t socket; 124 125 nxt_buf_t *read; 126 const nxt_conn_state_t *read_state; 127 nxt_work_queue_t *read_work_queue; 128 nxt_timer_t read_timer; 129 130 nxt_buf_t *write; 131 const nxt_conn_state_t *write_state; 132 nxt_work_queue_t *write_work_queue; 133 nxt_event_write_rate_t *rate; 134 nxt_timer_t write_timer; 135 136 nxt_off_t sent; 137 uint32_t max_chunk; 138 uint32_t nbytes; 139 140 nxt_conn_io_t *io; 141 142 union { 143 #if (NXT_TLS) 144 void *tls; 145 #endif 146 nxt_thread_pool_t *thread_pool; 147 } u; 148 149 nxt_mp_t *mem_pool; 150 151 nxt_task_t task; 152 nxt_log_t log; 153 154 nxt_listen_event_t *listen; 155 156 nxt_sockaddr_t *remote; 157 nxt_sockaddr_t *local; 158 const char *action; 159 160 uint8_t block_read; /* 1 bit */ 161 uint8_t block_write; /* 1 bit */ 162 uint8_t delayed; /* 1 bit */ 163 164 #define NXT_CONN_SENDFILE_OFF 0 165 #define NXT_CONN_SENDFILE_ON 1 166 #define NXT_CONN_SENDFILE_UNSET 3 167 168 uint8_t sendfile; /* 2 bits */ 169 uint8_t tcp_nodelay; /* 1 bit */ 170 171 nxt_queue_link_t link; 172 }; 173 174 175 #define nxt_conn_timer_init(ev, c, wq) \ 176 do { \ 177 (ev)->work_queue = (wq); \ 178 (ev)->log = &(c)->log; \ 179 (ev)->bias = NXT_TIMER_DEFAULT_BIAS; \ 180 } while (0) 181 182 183 #define nxt_read_timer_conn(ev) \ 184 nxt_timer_data(ev, nxt_conn_t, read_timer) 185 186 187 #define nxt_write_timer_conn(ev) \ 188 nxt_timer_data(ev, nxt_conn_t, write_timer) 189 190 191 #if (NXT_HAVE_UNIX_DOMAIN) 192 193 #define nxt_conn_tcp_nodelay_on(task, c) \ 194 do { \ 195 nxt_int_t ret; \ 196 \ 197 if ((c)->remote->u.sockaddr.sa_family != AF_UNIX) { \ 198 ret = nxt_socket_setsockopt(task, (c)->socket.fd, IPPROTO_TCP, \ 199 TCP_NODELAY, 1); \ 200 \ 201 (c)->tcp_nodelay = (ret == NXT_OK); \ 202 } \ 203 } while (0) 204 205 206 #else 207 208 #define nxt_conn_tcp_nodelay_on(task, c) \ 209 do { \ 210 nxt_int_t ret; \ 211 \ 212 ret = nxt_socket_setsockopt(task, (c)->socket.fd, IPPROTO_TCP, \ 213 TCP_NODELAY, 1); \ 214 \ 215 (c)->tcp_nodelay = (ret == NXT_OK); \ 216 } while (0) 217 218 #endif 219 220 221 NXT_EXPORT nxt_conn_t *nxt_conn_create(nxt_mp_t *mp, nxt_task_t *task); 222 NXT_EXPORT void nxt_conn_free(nxt_task_t *task, nxt_conn_t *c); 223 NXT_EXPORT void nxt_conn_close(nxt_event_engine_t *engine, nxt_conn_t *c); 224 225 NXT_EXPORT void nxt_conn_timer(nxt_event_engine_t *engine, nxt_conn_t *c, 226 const nxt_conn_state_t *state, nxt_timer_t *tev); 227 NXT_EXPORT void nxt_conn_work_queue_set(nxt_conn_t *c, nxt_work_queue_t *wq); 228 NXT_EXPORT nxt_sockaddr_t *nxt_conn_local_addr(nxt_task_t *task, 229 nxt_conn_t *c); 230 231 void nxt_conn_sys_socket(nxt_task_t *task, void *obj, void *data); 232 void nxt_conn_io_connect(nxt_task_t *task, void *obj, void *data); 233 nxt_int_t nxt_conn_socket(nxt_task_t *task, nxt_conn_t *c); 234 void nxt_conn_connect_test(nxt_task_t *task, void *obj, void *data); 235 void nxt_conn_connect_error(nxt_task_t *task, void *obj, void *data); 236 237 NXT_EXPORT nxt_listen_event_t *nxt_listen_event(nxt_task_t *task, 238 nxt_listen_socket_t *ls); 239 void nxt_conn_io_accept(nxt_task_t *task, void *obj, void *data); 240 NXT_EXPORT void nxt_conn_accept(nxt_task_t *task, nxt_listen_event_t *lev, 241 nxt_conn_t *c); 242 void nxt_conn_accept_error(nxt_task_t *task, nxt_listen_event_t *lev, 243 const char *accept_syscall, nxt_err_t err); 244 245 void nxt_conn_wait(nxt_conn_t *c); 246 247 void nxt_conn_io_read(nxt_task_t *task, void *obj, void *data); 248 ssize_t nxt_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b); 249 ssize_t nxt_conn_io_recv(nxt_conn_t *c, void *buf, size_t size, 250 nxt_uint_t flags); 251 252 void nxt_conn_io_write(nxt_task_t *task, void *obj, void *data); 253 ssize_t nxt_conn_io_sendbuf(nxt_task_t *task, nxt_sendbuf_t *sb); 254 ssize_t nxt_conn_io_writev(nxt_task_t *task, nxt_sendbuf_t *sb, 255 nxt_iobuf_t *iob, nxt_uint_t niob); 256 ssize_t nxt_conn_io_send(nxt_task_t *task, nxt_sendbuf_t *sb, void *buf, 257 size_t size); 258 259 size_t nxt_event_conn_write_limit(nxt_conn_t *c); 260 nxt_bool_t nxt_event_conn_write_delayed(nxt_event_engine_t *engine, 261 nxt_conn_t *c, size_t sent); 262 ssize_t nxt_event_conn_io_writev(nxt_conn_t *c, nxt_iobuf_t *iob, 263 nxt_uint_t niob); 264 ssize_t nxt_event_conn_io_send(nxt_conn_t *c, void *buf, size_t size); 265 266 NXT_EXPORT void nxt_event_conn_job_sendfile(nxt_task_t *task, 267 nxt_conn_t *c); 268 269 270 #define nxt_conn_connect(engine, c) \ 271 nxt_work_queue_add(&engine->socket_work_queue, nxt_conn_sys_socket, \ 272 c->socket.task, c, c->socket.data) 273 274 275 #define nxt_conn_read(engine, c) \ 276 do { \ 277 nxt_event_engine_t *e = engine; \ 278 \ 279 c->socket.read_work_queue = &e->read_work_queue; \ 280 \ 281 nxt_work_queue_add(&e->read_work_queue, c->io->read, \ 282 c->socket.task, c, c->socket.data); \ 283 } while (0) 284 285 286 #define nxt_conn_write(engine, c) \ 287 do { \ 288 nxt_event_engine_t *e = engine; \ 289 \ 290 c->socket.write_work_queue = &e->write_work_queue; \ 291 \ 292 nxt_work_queue_add(&e->write_work_queue, c->io->write, \ 293 c->socket.task, c, c->socket.data); \ 294 } while (0) 295 296 297 extern nxt_conn_io_t nxt_unix_conn_io; 298 299 300 typedef struct { 301 /* 302 * Client and peer connections are not embedded because already 303 * existent connections can be switched to the event connection proxy. 304 */ 305 nxt_conn_t *client; 306 nxt_conn_t *peer; 307 nxt_buf_t *client_buffer; 308 nxt_buf_t *peer_buffer; 309 310 size_t client_buffer_size; 311 size_t peer_buffer_size; 312 313 nxt_msec_t client_wait_timeout; 314 nxt_msec_t connect_timeout; 315 nxt_msec_t reconnect_timeout; 316 nxt_msec_t peer_wait_timeout; 317 nxt_msec_t client_write_timeout; 318 nxt_msec_t peer_write_timeout; 319 320 uint8_t connected; /* 1 bit */ 321 uint8_t delayed; /* 1 bit */ 322 uint8_t retries; /* 8 bits */ 323 uint8_t retain; /* 2 bits */ 324 325 nxt_work_handler_t completion_handler; 326 } nxt_conn_proxy_t; 327 328 329 NXT_EXPORT nxt_conn_proxy_t *nxt_conn_proxy_create(nxt_conn_t *c); 330 NXT_EXPORT void nxt_conn_proxy(nxt_task_t *task, nxt_conn_proxy_t *p); 331 332 333 /* STUB */ 334 #define nxt_event_conn_t nxt_conn_t 335 #define nxt_event_conn_state_t nxt_conn_state_t 336 #define nxt_event_conn_proxy_t nxt_conn_proxy_t 337 #define nxt_event_conn_read nxt_conn_read 338 #define nxt_event_conn_write nxt_conn_write 339 #define nxt_event_conn_close nxt_conn_close 340 341 342 #endif /* _NXT_CONN_H_INCLUDED_ */ 343