1
2 /*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7 #ifndef _NXT_PORT_H_INCLUDED_
8 #define _NXT_PORT_H_INCLUDED_
9
10
11 struct nxt_port_handlers_s {
12 /* RPC responses. */
13 nxt_port_handler_t rpc_ready;
14 nxt_port_handler_t rpc_error;
15
16 /* Main process RPC requests. */
17 nxt_port_handler_t start_process;
18 nxt_port_handler_t socket;
19 nxt_port_handler_t modules;
20 nxt_port_handler_t conf_store;
21 nxt_port_handler_t cert_get;
22 nxt_port_handler_t cert_delete;
23 nxt_port_handler_t access_log;
24
25 /* File descriptor exchange. */
26 nxt_port_handler_t change_file;
27 nxt_port_handler_t new_port;
28 nxt_port_handler_t get_port;
29 nxt_port_handler_t port_ack;
30 nxt_port_handler_t mmap;
31 nxt_port_handler_t get_mmap;
32
33 /* New process */
34 nxt_port_handler_t process_created;
35 nxt_port_handler_t process_ready;
36 nxt_port_handler_t whoami;
37
38 /* Process exit/crash notification. */
39 nxt_port_handler_t remove_pid;
40
41 /* Stop process command. */
42 nxt_port_handler_t quit;
43
44 /* Request headers. */
45 nxt_port_handler_t req_headers;
46 nxt_port_handler_t req_headers_ack;
47 nxt_port_handler_t req_body;
48
49 /* Websocket frame. */
50 nxt_port_handler_t websocket_frame;
51
52 /* Various data. */
53 nxt_port_handler_t data;
54 nxt_port_handler_t app_restart;
55
56 nxt_port_handler_t oosm;
57 nxt_port_handler_t shm_ack;
58 nxt_port_handler_t read_queue;
59 nxt_port_handler_t read_socket;
60 };
61
62
63 #define nxt_port_handler_idx(name) \
64 ( offsetof(nxt_port_handlers_t, name) / sizeof(nxt_port_handler_t) )
65
66 #define nxt_msg_last(handler) \
67 (handler | NXT_PORT_MSG_LAST)
68
69 typedef enum {
70 NXT_PORT_MSG_LAST = 0x100,
71 NXT_PORT_MSG_CLOSE_FD = 0x200,
72 NXT_PORT_MSG_SYNC = 0x400,
73
74 NXT_PORT_MSG_MASK = 0xFF,
75
76 _NXT_PORT_MSG_RPC_READY = nxt_port_handler_idx(rpc_ready),
77 _NXT_PORT_MSG_RPC_ERROR = nxt_port_handler_idx(rpc_error),
78
79 _NXT_PORT_MSG_START_PROCESS = nxt_port_handler_idx(start_process),
80 _NXT_PORT_MSG_SOCKET = nxt_port_handler_idx(socket),
81 _NXT_PORT_MSG_MODULES = nxt_port_handler_idx(modules),
82 _NXT_PORT_MSG_CONF_STORE = nxt_port_handler_idx(conf_store),
83 _NXT_PORT_MSG_CERT_GET = nxt_port_handler_idx(cert_get),
84 _NXT_PORT_MSG_CERT_DELETE = nxt_port_handler_idx(cert_delete),
85 _NXT_PORT_MSG_ACCESS_LOG = nxt_port_handler_idx(access_log),
86
87 _NXT_PORT_MSG_CHANGE_FILE = nxt_port_handler_idx(change_file),
88 _NXT_PORT_MSG_NEW_PORT = nxt_port_handler_idx(new_port),
89 _NXT_PORT_MSG_GET_PORT = nxt_port_handler_idx(get_port),
90 _NXT_PORT_MSG_PORT_ACK = nxt_port_handler_idx(port_ack),
91 _NXT_PORT_MSG_MMAP = nxt_port_handler_idx(mmap),
92 _NXT_PORT_MSG_GET_MMAP = nxt_port_handler_idx(get_mmap),
93
94 _NXT_PORT_MSG_PROCESS_CREATED = nxt_port_handler_idx(process_created),
95 _NXT_PORT_MSG_PROCESS_READY = nxt_port_handler_idx(process_ready),
96 _NXT_PORT_MSG_WHOAMI = nxt_port_handler_idx(whoami),
97 _NXT_PORT_MSG_REMOVE_PID = nxt_port_handler_idx(remove_pid),
98 _NXT_PORT_MSG_QUIT = nxt_port_handler_idx(quit),
99
100 _NXT_PORT_MSG_REQ_HEADERS = nxt_port_handler_idx(req_headers),
101 _NXT_PORT_MSG_REQ_HEADERS_ACK = nxt_port_handler_idx(req_headers_ack),
102 _NXT_PORT_MSG_REQ_BODY = nxt_port_handler_idx(req_body),
103 _NXT_PORT_MSG_WEBSOCKET = nxt_port_handler_idx(websocket_frame),
104
105 _NXT_PORT_MSG_DATA = nxt_port_handler_idx(data),
106 _NXT_PORT_MSG_APP_RESTART = nxt_port_handler_idx(app_restart),
107
108 _NXT_PORT_MSG_OOSM = nxt_port_handler_idx(oosm),
109 _NXT_PORT_MSG_SHM_ACK = nxt_port_handler_idx(shm_ack),
110 _NXT_PORT_MSG_READ_QUEUE = nxt_port_handler_idx(read_queue),
111 _NXT_PORT_MSG_READ_SOCKET = nxt_port_handler_idx(read_socket),
112
113 NXT_PORT_MSG_MAX = sizeof(nxt_port_handlers_t)
114 / sizeof(nxt_port_handler_t),
115
116 NXT_PORT_MSG_RPC_READY = _NXT_PORT_MSG_RPC_READY,
117 NXT_PORT_MSG_RPC_READY_LAST = nxt_msg_last(_NXT_PORT_MSG_RPC_READY),
118 NXT_PORT_MSG_RPC_ERROR = nxt_msg_last(_NXT_PORT_MSG_RPC_ERROR),
119 NXT_PORT_MSG_START_PROCESS = nxt_msg_last(_NXT_PORT_MSG_START_PROCESS),
120 NXT_PORT_MSG_SOCKET = nxt_msg_last(_NXT_PORT_MSG_SOCKET),
121 NXT_PORT_MSG_MODULES = nxt_msg_last(_NXT_PORT_MSG_MODULES),
122 NXT_PORT_MSG_CONF_STORE = nxt_msg_last(_NXT_PORT_MSG_CONF_STORE),
123 NXT_PORT_MSG_CERT_GET = nxt_msg_last(_NXT_PORT_MSG_CERT_GET),
124 NXT_PORT_MSG_CERT_DELETE = nxt_msg_last(_NXT_PORT_MSG_CERT_DELETE),
125 NXT_PORT_MSG_ACCESS_LOG = nxt_msg_last(_NXT_PORT_MSG_ACCESS_LOG),
126 NXT_PORT_MSG_CHANGE_FILE = nxt_msg_last(_NXT_PORT_MSG_CHANGE_FILE),
127 NXT_PORT_MSG_NEW_PORT = nxt_msg_last(_NXT_PORT_MSG_NEW_PORT),
128 NXT_PORT_MSG_GET_PORT = nxt_msg_last(_NXT_PORT_MSG_GET_PORT),
129 NXT_PORT_MSG_PORT_ACK = nxt_msg_last(_NXT_PORT_MSG_PORT_ACK),
130 NXT_PORT_MSG_MMAP = nxt_msg_last(_NXT_PORT_MSG_MMAP)
131 | NXT_PORT_MSG_SYNC,
132 NXT_PORT_MSG_GET_MMAP = nxt_msg_last(_NXT_PORT_MSG_GET_MMAP),
133
134 NXT_PORT_MSG_PROCESS_CREATED = nxt_msg_last(_NXT_PORT_MSG_PROCESS_CREATED),
135 NXT_PORT_MSG_PROCESS_READY = nxt_msg_last(_NXT_PORT_MSG_PROCESS_READY),
136 NXT_PORT_MSG_WHOAMI = nxt_msg_last(_NXT_PORT_MSG_WHOAMI),
137 NXT_PORT_MSG_QUIT = nxt_msg_last(_NXT_PORT_MSG_QUIT),
138 NXT_PORT_MSG_REMOVE_PID = nxt_msg_last(_NXT_PORT_MSG_REMOVE_PID),
139
140 NXT_PORT_MSG_REQ_HEADERS = _NXT_PORT_MSG_REQ_HEADERS,
141 NXT_PORT_MSG_REQ_BODY = _NXT_PORT_MSG_REQ_BODY,
142 NXT_PORT_MSG_WEBSOCKET = _NXT_PORT_MSG_WEBSOCKET,
143 NXT_PORT_MSG_WEBSOCKET_LAST = nxt_msg_last(_NXT_PORT_MSG_WEBSOCKET),
144
145 NXT_PORT_MSG_DATA = _NXT_PORT_MSG_DATA,
146 NXT_PORT_MSG_DATA_LAST = nxt_msg_last(_NXT_PORT_MSG_DATA),
147 NXT_PORT_MSG_APP_RESTART = nxt_msg_last(_NXT_PORT_MSG_APP_RESTART),
148
149 NXT_PORT_MSG_OOSM = nxt_msg_last(_NXT_PORT_MSG_OOSM),
150 NXT_PORT_MSG_SHM_ACK = nxt_msg_last(_NXT_PORT_MSG_SHM_ACK),
151 NXT_PORT_MSG_READ_QUEUE = _NXT_PORT_MSG_READ_QUEUE,
152 NXT_PORT_MSG_READ_SOCKET = _NXT_PORT_MSG_READ_SOCKET,
153 } nxt_port_msg_type_t;
154
155
156 /* Passed as a first iov chunk. */
157 typedef struct {
158 uint32_t stream;
159
160 nxt_pid_t pid; /* not used on Linux and FreeBSD */
161
162 nxt_port_id_t reply_port;
163
164 uint8_t type;
165
166 /* Last message for this stream. */
167 uint8_t last; /* 1 bit */
168
169 /* Message data send using mmap, next chunk is a nxt_port_mmap_msg_t. */
170 uint8_t mmap; /* 1 bit */
171
172 /* Non-First fragment in fragmented message sequence. */
173 uint8_t nf; /* 1 bit */
174
175 /* More Fragments followed. */
176 uint8_t mf; /* 1 bit */
177
178 /* Message delivery tracking enabled, next chunk is tracking msg. */
179 uint8_t tracking; /* 1 bit */
180 } nxt_port_msg_t;
181
182
183 typedef struct {
184 nxt_queue_link_t link;
185 nxt_buf_t *buf;
186 size_t share;
187 nxt_fd_t fd[2];
188 nxt_port_msg_t port_msg;
189 uint32_t tracking_msg[2];
190 uint8_t close_fd; /* 1 bit */
191 uint8_t allocated; /* 1 bit */
192 } nxt_port_send_msg_t;
193
194 #if (NXT_HAVE_UCRED) || (NXT_HAVE_MSGHDR_CMSGCRED)
195 #define NXT_USE_CMSG_PID 1
196 #endif
197
198 struct nxt_port_recv_msg_s {
199 nxt_fd_t fd[2];
200 nxt_buf_t *buf;
201 nxt_port_t *port;
202 nxt_port_msg_t port_msg;
203 size_t size;
204 #if (NXT_USE_CMSG_PID)
205 nxt_pid_t cmsg_pid;
206 #endif
207 nxt_bool_t cancelled;
208 union {
209 nxt_port_t *new_port;
210 nxt_pid_t removed_pid;
211 void *data;
212 } u;
213 };
214
215
216 #if (NXT_USE_CMSG_PID)
217 #define nxt_recv_msg_cmsg_pid(msg) ((msg)->cmsg_pid)
218 #define nxt_recv_msg_cmsg_pid_ref(msg) (&(msg)->cmsg_pid)
219 #else
220 #define nxt_recv_msg_cmsg_pid(msg) ((msg)->port_msg.pid)
221 #define nxt_recv_msg_cmsg_pid_ref(msg) (NULL)
222 #endif
223
224 typedef struct nxt_app_s nxt_app_t;
225
226 struct nxt_port_s {
227 nxt_fd_event_t socket;
228
229 nxt_queue_link_t link; /* for nxt_process_t.ports */
230 nxt_process_t *process;
231
232 nxt_queue_link_t app_link; /* for nxt_app_t.ports */
233 nxt_app_t *app;
234 nxt_port_t *main_app_port;
235
236 nxt_queue_link_t idle_link; /* for nxt_app_t.idle_ports */
237 nxt_msec_t idle_start;
238
239 nxt_queue_t messages; /* of nxt_port_send_msg_t */
240 nxt_thread_mutex_t write_mutex;
241
242 /* Maximum size of message part. */
243 uint32_t max_size;
244 /* Maximum interleave of message parts. */
245 uint32_t max_share;
246
247 uint32_t active_websockets;
248 uint32_t active_requests;
249
250 nxt_port_handler_t handler;
251 nxt_port_handler_t *data;
252
253 nxt_mp_t *mem_pool;
254 nxt_event_engine_t *engine;
255
256 nxt_buf_t *free_bufs;
257 nxt_socket_t pair[2];
258
259 nxt_port_id_t id;
260 nxt_pid_t pid;
261
262 nxt_lvlhsh_t rpc_streams; /* stream to nxt_port_rpc_reg_t */
263 nxt_lvlhsh_t rpc_peers; /* peer to queue of nxt_port_rpc_reg_t */
264
265 nxt_lvlhsh_t frags;
266
267 nxt_atomic_t use_count;
268
269 nxt_process_type_t type;
270
271 nxt_fd_t queue_fd;
272 void *queue;
273
274 void *socket_msg;
275 int from_socket;
276 };
277
278
279 typedef struct {
280 nxt_port_id_t id;
281 nxt_pid_t pid;
282 size_t max_size;
283 size_t max_share;
284 nxt_process_type_t type:8;
285 } nxt_port_msg_new_port_t;
286
287
288 typedef struct {
289 nxt_port_id_t id;
290 nxt_pid_t pid;
291 } nxt_port_msg_get_port_t;
292
293
294 typedef struct {
295 uint32_t id;
296 } nxt_port_msg_get_mmap_t;
297
298
299 /*
300 * nxt_port_data_t size is allocation size
301 * which enables effective reuse of memory pool cache.
302 */
303 typedef union {
304 nxt_buf_t buf;
305 nxt_port_msg_new_port_t new_port;
306 } nxt_port_data_t;
307
308
309 typedef void (*nxt_port_post_handler_t)(nxt_task_t *task, nxt_port_t *port,
310 void *data);
311
312 nxt_port_t *nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid,
313 nxt_process_type_t type);
314
315 nxt_port_id_t nxt_port_get_next_id(void);
316 void nxt_port_reset_next_id(void);
317
318 nxt_int_t nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port,
319 size_t max_size);
320 void nxt_port_destroy(nxt_port_t *port);
321 void nxt_port_close(nxt_task_t *task, nxt_port_t *port);
322 void nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port);
323 void nxt_port_write_close(nxt_port_t *port);
324 void nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port);
325 void nxt_port_read_close(nxt_port_t *port);
326 nxt_int_t nxt_port_socket_write2(nxt_task_t *task, nxt_port_t *port,
327 nxt_uint_t type, nxt_fd_t fd, nxt_fd_t fd2, uint32_t stream,
328 nxt_port_id_t reply_port, nxt_buf_t *b);
329
330 nxt_inline nxt_int_t
nxt_port_socket_write(nxt_task_t * task,nxt_port_t * port,nxt_uint_t type,nxt_fd_t fd,uint32_t stream,nxt_port_id_t reply_port,nxt_buf_t * b)331 nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port,
332 nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port,
333 nxt_buf_t *b)
334 {
335 return nxt_port_socket_write2(task, port, type, fd, -1, stream, reply_port,
336 b);
337 }
338
339 void nxt_port_enable(nxt_task_t *task, nxt_port_t *port,
340 const nxt_port_handlers_t *handlers);
341 nxt_int_t nxt_port_send_port(nxt_task_t *task, nxt_port_t *port,
342 nxt_port_t *new_port, uint32_t stream);
343 void nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt,
344 nxt_uint_t slot, nxt_fd_t fd);
345 void nxt_port_remove_notify_others(nxt_task_t *task, nxt_process_t *process);
346
347 void nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
348 void nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
349 void nxt_port_process_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
350 void nxt_port_change_log_file_handler(nxt_task_t *task,
351 nxt_port_recv_msg_t *msg);
352 void nxt_port_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
353 void nxt_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
354 void nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
355 void nxt_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
356
357 nxt_int_t nxt_port_post(nxt_task_t *task, nxt_port_t *port,
358 nxt_port_post_handler_t handler, void *data);
359 void nxt_port_use(nxt_task_t *task, nxt_port_t *port, int i);
360
nxt_port_inc_use(nxt_port_t * port)361 nxt_inline void nxt_port_inc_use(nxt_port_t *port)
362 {
363 nxt_atomic_fetch_add(&port->use_count, 1);
364 }
365
366 #endif /* _NXT_PORT_H_INCLUDED_ */
367