1 /*
2 +----------------------------------------------------------------------+
3 | Swoole |
4 +----------------------------------------------------------------------+
5 | This source file is subject to version 2.0 of the Apache license, |
6 | that is bundled with this package in the file LICENSE, and is |
7 | available through the world-wide-web at the following url: |
8 | http://www.apache.org/licenses/LICENSE-2.0.html |
9 | If you did not receive a copy of the Apache2.0 license and are unable|
10 | to obtain it through the world-wide-web, please send a note to |
11 | license@php.net so we can mail you a copy immediately. |
12 +----------------------------------------------------------------------+
13 | Author: Tianfeng Han <mikan.tenny@gmail.com> |
14 +----------------------------------------------------------------------+
15 */
16
17 #pragma once
18
19 #include "swoole_api.h"
20 #include "swoole_string.h"
21 #include "swoole_socket.h"
22 #include "swoole_timer.h"
23 #include "swoole_reactor.h"
24 #include "swoole_signal.h"
25 #include "swoole_protocol.h"
26 #include "swoole_process_pool.h"
27 #include "swoole_pipe.h"
28 #include "swoole_channel.h"
29
30 #ifdef SW_USE_OPENSSL
31 #include "swoole_dtls.h"
32 #endif
33
34 #ifdef __MACH__
35 #include <sys/syslimits.h>
36 #endif
37
38 #include <string>
39 #include <queue>
40 #include <thread>
41 #include <mutex>
42 #include <atomic>
43 #include <unordered_map>
44 #include <unordered_set>
45
46 //------------------------------------Server-------------------------------------------
47 namespace swoole {
48
49 namespace http_server {
50 struct Request;
51 }
52
53 class Server;
54 struct Manager;
55
56 struct Session {
57 SessionId id;
58 int fd;
59 uint32_t reactor_id : 8;
60 uint32_t reserve_ : 24;
61 };
62
63 struct Connection {
64 /**
65 * It must be in the header. When set to 0, it means that connection does not exist.
66 * One-write and multiple-read operation is thread-safe
67 * system fd must be 0. en: signalfd, listen socket
68 */
69 uint8_t active;
70 SocketType socket_type;
71 int fd;
72 int worker_id;
73 SessionId session_id;
74 //--------------------------------------------------------------
75 #ifdef SW_USE_OPENSSL
76 uint8_t ssl;
77 uint8_t ssl_ready;
78 #endif
79 uint8_t overflow;
80 uint8_t high_watermark;
81 uint8_t http_upgrade;
82 #ifdef SW_USE_HTTP2
83 uint8_t http2_stream;
84 #endif
85 #ifdef SW_HAVE_ZLIB
86 uint8_t websocket_compression;
87 #endif
88 // If it is equal to 1, it means server actively closed the connection
89 uint8_t close_actively;
90 uint8_t closed;
91 uint8_t close_queued;
92 uint8_t closing;
93 uint8_t close_reset;
94 uint8_t peer_closed;
95 // protected connection, do not close connection when receiving/sending timeout
96 uint8_t protect;
97 uint8_t close_notify;
98 uint8_t close_force;
99 ReactorId reactor_id;
100 uint16_t close_errno;
101 int server_fd;
102 sw_atomic_t recv_queued_bytes;
103 uint32_t send_queued_bytes;
104 uint16_t waiting_time;
105 TimerNode *timer;
106 /**
107 * socket address
108 */
109 network::Address info;
110 /**
111 * link any thing, for kernel, do not use with application.
112 */
113 void *object;
114 /**
115 * socket, only operated in the main process
116 */
117 network::Socket *socket;
118 /**
119 * connect/recv/send/close time
120 */
121 double connect_time;
122 double last_recv_time;
123 double last_send_time;
124 double last_dispatch_time;
125 /**
126 * bind uid
127 */
128 uint32_t uid;
129 /**
130 * upgarde websocket
131 */
132 uint8_t websocket_status;
133 /**
134 * unfinished data frame
135 */
136 String *websocket_buffer;
137
138 #ifdef SW_USE_OPENSSL
139 String *ssl_client_cert;
140 uint16_t ssl_client_cert_pid;
141 #endif
142 sw_atomic_t lock;
143 };
144
145 struct PipeBuffer {
146 DataHead info;
147 char data[0];
148
is_beginPipeBuffer149 bool is_begin() {
150 return info.flags & SW_EVENT_DATA_BEGIN;
151 }
152
is_chunkedPipeBuffer153 bool is_chunked() {
154 return info.flags & SW_EVENT_DATA_CHUNK;
155 }
156
is_endPipeBuffer157 bool is_end() {
158 return info.flags & SW_EVENT_DATA_END;
159 }
160 };
161
162 //------------------------------------Packet-------------------------------------------
163 struct PacketPtr {
164 size_t length;
165 char *data;
166 };
167
168 struct DgramPacket {
169 SocketType socket_type;
170 network::Address socket_addr;
171 uint32_t length;
172 char data[0];
173 };
174
175 struct PacketTask {
176 size_t length;
177 char tmpfile[SW_TASK_TMP_PATH_SIZE];
178 };
179
180 class MessageBus {
181 private:
182 const Allocator *allocator_;
183 std::unordered_map<uint64_t, std::shared_ptr<String>> packet_pool_;
184 std::function<uint64_t(void)> id_generator_;
185 size_t buffer_size_;
186 PipeBuffer *buffer_ = nullptr;
187 bool always_chunked_transfer_ = false;
188
189 String *get_packet_buffer();
190 ReturnCode prepare_packet(uint16_t &recv_chunk_count, String *packet_buffer);
191
192 public:
MessageBus()193 MessageBus() {
194 allocator_ = sw_std_allocator();
195 buffer_size_ = SW_BUFFER_SIZE_STD;
196 }
197
~MessageBus()198 ~MessageBus() {
199 allocator_->free(buffer_);
200 }
201
empty()202 bool empty() {
203 return packet_pool_.empty();
204 }
205
clear()206 void clear() {
207 packet_pool_.clear();
208 }
209
set_allocator(const Allocator * allocator)210 void set_allocator(const Allocator *allocator) {
211 allocator_ = allocator;
212 }
213
set_id_generator(const std::function<uint64_t (void)> & id_generator)214 void set_id_generator(const std::function<uint64_t(void)> &id_generator) {
215 id_generator_ = id_generator;
216 }
217
set_buffer_size(size_t buffer_size)218 void set_buffer_size(size_t buffer_size) {
219 buffer_size_ = buffer_size;
220 }
221
set_always_chunked_transfer()222 void set_always_chunked_transfer() {
223 always_chunked_transfer_ = true;
224 }
225
get_buffer_size()226 size_t get_buffer_size() {
227 return buffer_size_;
228 }
229
230 size_t get_memory_size();
231
alloc_buffer()232 bool alloc_buffer() {
233 void *_ptr = allocator_->malloc(sizeof(*buffer_) + buffer_size_);
234 if (_ptr) {
235 buffer_ = (PipeBuffer *) _ptr;
236 sw_memset_zero(&buffer_->info, sizeof(buffer_->info));
237 return true;
238 } else {
239 return false;
240 }
241 }
242
pass(SendData * task)243 void pass(SendData *task) {
244 memcpy(&buffer_->info, &task->info, sizeof(buffer_->info));
245 if (task->info.len > 0) {
246 buffer_->info.flags = SW_EVENT_DATA_PTR;
247 PacketPtr pkt{task->info.len, (char *) task->data};
248 buffer_->info.len = sizeof(pkt);
249 memcpy(buffer_->data, &pkt, sizeof(pkt));
250 }
251 }
252
253 /**
254 * Send data to socket. If the data sent is larger than Server::ipc_max_size, then it is sent in chunks.
255 * Otherwise send it directly.
256 * @return: send success returns true, send failure returns false.
257 */
258 bool write(network::Socket *sock, SendData *packet);
259 /**
260 * Receive data from socket, if only one chunk is received, packet will be saved in packet_pool.
261 * Then continue to listen to readable events, waiting for more chunks.
262 * @return: >0: receive a complete packet, 0: continue to wait for data, -1: an error occurred
263 */
264 ssize_t read(network::Socket *sock);
265 /**
266 * Receive data from pipeline, and store data to buffer
267 * @return: >0: receive a complete packet, 0: continue to wait for data, -1: an error occurred
268 */
269 ssize_t read_with_buffer(network::Socket *sock);
270 /**
271 * The last chunk of data has been received, return address and length, start processing this packet.
272 */
273 PacketPtr get_packet() const;
get_buffer()274 PipeBuffer *get_buffer() {
275 return buffer_;
276 }
277 /**
278 * Pop the data memory address to the outer layer, no longer managed by MessageBus
279 */
move_packet()280 char *move_packet() {
281 uint64_t msg_id = buffer_->info.msg_id;
282 auto iter = packet_pool_.find(msg_id);
283 if (iter != packet_pool_.end()) {
284 auto str = iter->second.get();
285 char *val = str->str;
286 str->str = nullptr;
287 return val;
288 } else {
289 return nullptr;
290 }
291 }
292 /**
293 * The processing of this data packet has been completed, and the relevant memory has been released
294 */
pop()295 void pop() {
296 if (buffer_->is_end()) {
297 packet_pool_.erase(buffer_->info.msg_id);
298 }
299 }
300 };
301
302 //------------------------------------ReactorThread-------------------------------------------
303 struct ReactorThread {
304 int id;
305 std::thread thread;
306 network::Socket *notify_pipe = nullptr;
307 uint32_t pipe_num = 0;
308 uint64_t dispatch_count = 0;
309 network::Socket *pipe_sockets = nullptr;
310 network::Socket *pipe_command = nullptr;
311 MessageBus message_bus;
312
313 int init(Server *serv, Reactor *reactor, uint16_t reactor_id);
314 };
315
316 struct ServerPortGS {
317 sw_atomic_t connection_num;
318 sw_atomic_long_t abort_count;
319 sw_atomic_long_t accept_count;
320 sw_atomic_long_t close_count;
321 sw_atomic_long_t dispatch_count;
322 sw_atomic_long_t request_count;
323 sw_atomic_long_t response_count;
324 sw_atomic_long_t total_recv_bytes;
325 sw_atomic_long_t total_send_bytes;
326 };
327
328 struct ListenPort {
329 /**
330 * tcp socket listen backlog
331 */
332 uint16_t backlog = SW_BACKLOG;
333 bool listening = false;
334 /**
335 * open tcp_defer_accept option
336 */
337 int tcp_defer_accept = 0;
338 /**
339 * TCP_FASTOPEN
340 */
341 int tcp_fastopen = 0;
342 /**
343 * TCP KeepAlive
344 */
345 int tcp_keepidle = SW_TCP_KEEPIDLE;
346 int tcp_keepinterval = SW_TCP_KEEPINTERVAL;
347 int tcp_keepcount = SW_TCP_KEEPCOUNT;
348
349 int tcp_user_timeout = 0;
350
351 uint16_t max_idle_time = 0;
352
353 int socket_buffer_size = network::Socket::default_buffer_size;
354 uint32_t buffer_high_watermark = 0;
355 uint32_t buffer_low_watermark = 0;
356
357 SocketType type = SW_SOCK_TCP;
358 uint8_t ssl = 0;
359 std::string host;
360 int port = 0;
361 network::Socket *socket = nullptr;
362 pthread_t thread_id = 0;
363
364 uint16_t heartbeat_idle_time = 0;
365
366 /**
367 * check data eof
368 */
369 bool open_eof_check = false;
370 /**
371 * built-in http protocol
372 */
373 bool open_http_protocol = false;
374 /**
375 * built-in http2.0 protocol
376 */
377 bool open_http2_protocol = false;
378 /**
379 * built-in websocket protocol
380 */
381 bool open_websocket_protocol = false;
382 /**
383 * open websocket close frame
384 */
385 bool open_websocket_close_frame = false;
386 /**
387 * open websocket ping frame
388 */
389 bool open_websocket_ping_frame = false;
390 /**
391 * open websocket pong frame
392 */
393 bool open_websocket_pong_frame = false;
394 /**
395 * one package: length check
396 */
397 bool open_length_check = false;
398 /**
399 * for mqtt protocol
400 */
401 bool open_mqtt_protocol = false;
402 /**
403 * redis protocol
404 */
405 bool open_redis_protocol = false;
406 /**
407 * open tcp nodelay option
408 */
409 bool open_tcp_nodelay = false;
410 /**
411 * open tcp nopush option(for sendfile)
412 */
413 bool open_tcp_nopush = true;
414 /**
415 * open tcp keepalive
416 */
417 bool open_tcp_keepalive = false;
418 /**
419 * Sec-WebSocket-Protocol
420 */
421 std::string websocket_subprotocol;
422 /**
423 * set socket option
424 */
425 int kernel_socket_recv_buffer_size = 0;
426 int kernel_socket_send_buffer_size = 0;
427
428 #ifdef SW_USE_OPENSSL
429 SSLContext *ssl_context = nullptr;
430 std::unordered_map<std::string, std::shared_ptr<SSLContext>> sni_contexts;
431 #ifdef SW_SUPPORT_DTLS
432 std::unordered_map<int, dtls::Session *> *dtls_sessions = nullptr;
is_dtlsListenPort433 bool is_dtls() {
434 return ssl_context && (ssl_context->protocols & SW_SSL_DTLS);
435 }
436 #endif
437 #endif
438
439 ServerPortGS *gs = nullptr;
440
441 Protocol protocol = {};
442 void *ptr = nullptr;
443
444 int (*onRead)(Reactor *reactor, ListenPort *port, Event *event) = nullptr;
445
is_dgramListenPort446 inline bool is_dgram() {
447 return network::Socket::is_dgram(type);
448 }
449
is_streamListenPort450 inline bool is_stream() {
451 return network::Socket::is_stream(type);
452 }
453
454 inline void set_eof_protocol(const std::string &eof, bool find_from_right = false) {
455 open_eof_check = true;
456 protocol.split_by_eof = !find_from_right;
457 protocol.package_eof_len = std::min(eof.length(), sizeof(protocol.package_eof));
458 memcpy(protocol.package_eof, eof.c_str(), protocol.package_eof_len);
459 }
460
set_length_protocolListenPort461 inline void set_length_protocol(uint32_t length_offset, char length_type, uint32_t body_offset) {
462 open_length_check = true;
463 protocol.package_length_type = length_type;
464 protocol.package_length_size = swoole_type_size(length_type);
465 protocol.package_body_offset = length_offset;
466 protocol.package_body_offset = body_offset;
467 }
468
469 ListenPort();
470 ~ListenPort() = default;
471 int listen();
472 void close();
473 bool import(int sock);
474 const char *get_protocols();
475
476 #ifdef SW_USE_OPENSSL
477 bool ssl_create_context(SSLContext *context);
478 bool ssl_create(Connection *conn, network::Socket *sock);
479 bool ssl_add_sni_cert(const std::string &name, SSLContext *context);
480 bool ssl_init();
481
ssl_set_key_fileListenPort482 void ssl_set_key_file(const std::string &file) {
483 ssl_context->key_file = file;
484 }
ssl_set_cert_fileListenPort485 void ssl_set_cert_file(const std::string &file) {
486 ssl_context->cert_file = file;
487 }
488 #endif
489 void clear_protocol();
get_socketListenPort490 inline network::Socket *get_socket() {
491 return socket;
492 }
get_portListenPort493 int get_port() {
494 return port;
495 }
get_hostListenPort496 const char *get_host() {
497 return host.c_str();
498 }
get_typeListenPort499 SocketType get_type() {
500 return type;
501 }
get_fdListenPort502 int get_fd() {
503 return socket ? socket->fd : -1;
504 }
505 };
506
507 struct ServerGS {
508 pid_t master_pid;
509 pid_t manager_pid;
510
511 SessionId session_round;
512 sw_atomic_t start;
513 sw_atomic_t shutdown;
514
515 int max_fd;
516 int min_fd;
517
518 time_t start_time;
519 sw_atomic_t connection_num;
520 sw_atomic_t tasking_num;
521 sw_atomic_long_t abort_count;
522 sw_atomic_long_t accept_count;
523 sw_atomic_long_t close_count;
524 sw_atomic_long_t dispatch_count;
525 sw_atomic_long_t request_count;
526 sw_atomic_long_t response_count;
527 sw_atomic_long_t total_recv_bytes;
528 sw_atomic_long_t total_send_bytes;
529 sw_atomic_long_t pipe_packet_msg_id;
530
531 sw_atomic_t spinlock;
532
533 #ifdef HAVE_PTHREAD_BARRIER
534 pthread_barrier_t manager_barrier;
535 pthread_barrierattr_t manager_barrier_attr;
536 #endif
537
538 ProcessPool task_workers;
539 ProcessPool event_workers;
540 };
541
542 class Factory {
543 protected:
544 Server *server_;
545
546 public:
Factory(Server * _server)547 Factory(Server *_server) {
548 server_ = _server;
549 }
~Factory()550 virtual ~Factory() {}
551 virtual bool start() = 0;
552 virtual bool shutdown() = 0;
553 virtual bool dispatch(SendData *) = 0;
554 virtual bool finish(SendData *) = 0;
555 virtual bool notify(DataHead *) = 0;
556 virtual bool end(SessionId sesion_id, int flags) = 0;
557 };
558
559 class BaseFactory : public Factory {
560 public:
BaseFactory(Server * server)561 BaseFactory(Server *server) : Factory(server) {}
562 ~BaseFactory();
563 bool start() override;
564 bool shutdown() override;
565 bool dispatch(SendData *) override;
566 bool finish(SendData *) override;
567 bool notify(DataHead *) override;
568 bool end(SessionId sesion_id, int flags) override;
569 };
570
571 class ProcessFactory : public Factory {
572 private:
573 std::vector<std::shared_ptr<UnixSocket>> pipes;
574
575 public:
576 ProcessFactory(Server *server);
577 ~ProcessFactory();
578 bool start() override;
579 bool shutdown() override;
580 bool dispatch(SendData *) override;
581 bool finish(SendData *) override;
582 bool notify(DataHead *) override;
583 bool end(SessionId sesion_id, int flags) override;
584 };
585
586 enum ServerEventType {
587 // recv data payload
588 SW_SERVER_EVENT_RECV_DATA,
589 SW_SERVER_EVENT_RECV_DGRAM,
590 // send data
591 SW_SERVER_EVENT_SEND_DATA,
592 SW_SERVER_EVENT_SEND_FILE,
593 // connection event
594 SW_SERVER_EVENT_CLOSE,
595 SW_SERVER_EVENT_CONNECT,
596 SW_SERVER_EVENT_CLOSE_FORCE,
597 // task
598 SW_SERVER_EVENT_TASK,
599 SW_SERVER_EVENT_FINISH,
600 // pipe
601 SW_SERVER_EVENT_PIPE_MESSAGE,
602 // event operate
603 SW_SERVER_EVENT_PAUSE_RECV,
604 SW_SERVER_EVENT_RESUME_RECV,
605 // buffer event
606 SW_SERVER_EVENT_BUFFER_FULL,
607 SW_SERVER_EVENT_BUFFER_EMPTY,
608 // process message
609 SW_SERVER_EVENT_INCOMING,
610 SW_SERVER_EVENT_SHUTDOWN,
611 SW_SERVER_EVENT_COMMAND_REQUEST,
612 SW_SERVER_EVENT_COMMAND_RESPONSE,
613 };
614
615 class Server {
616 public:
617 typedef int (*DispatchFunction)(Server *, Connection *, SendData *);
618
619 struct Command {
620 typedef std::function<void(Server *, const std::string &msg)> Callback;
621 typedef std::function<std::string(Server *, const std::string &msg)> Handler;
622 enum ProcessType {
623 MASTER = 1u << 1,
624 REACTOR_THREAD = 1u << 2,
625 EVENT_WORKER = 1u << 3,
626 TASK_WORKER = 1u << 4,
627 MANAGER = 1u << 5,
628 ALL_PROCESS = MASTER | REACTOR_THREAD | EVENT_WORKER | TASK_WORKER | MANAGER,
629 };
630 int id;
631 int accepted_process_types;
632 std::string name;
633 };
634
635 enum Mode {
636 MODE_BASE = 1,
637 MODE_PROCESS = 2,
638 };
639
640 enum TaskIpcMode {
641 TASK_IPC_UNIXSOCK = 1,
642 TASK_IPC_MSGQUEUE = 2,
643 TASK_IPC_PREEMPTIVE = 3,
644 TASK_IPC_STREAM = 4,
645 };
646
647 enum ThreadType {
648 THREAD_MASTER = 1,
649 THREAD_REACTOR = 2,
650 THREAD_HEARTBEAT = 3,
651 };
652
653 enum DispatchMode {
654 DISPATCH_ROUND = 1,
655 DISPATCH_FDMOD = 2,
656 DISPATCH_IDLE_WORKER = 3,
657 DISPATCH_IPMOD = 4,
658 DISPATCH_UIDMOD = 5,
659 DISPATCH_USERFUNC = 6,
660 DISPATCH_STREAM = 7,
661 DISPATCH_CO_CONN_LB,
662 DISPATCH_CO_REQ_LB,
663 };
664
665 enum FactoryDispatchResult {
666 DISPATCH_RESULT_DISCARD_PACKET = -1,
667 DISPATCH_RESULT_CLOSE_CONNECTION = -2,
668 DISPATCH_RESULT_USERFUNC_FALLBACK = -3,
669 };
670
671 enum HookType {
672 HOOK_MASTER_START,
673 HOOK_MASTER_TIMER,
674 HOOK_REACTOR_START,
675 HOOK_WORKER_START,
676 HOOK_TASK_WORKER_START,
677 HOOK_MASTER_CONNECT,
678 HOOK_REACTOR_CONNECT,
679 HOOK_WORKER_CONNECT,
680 HOOK_REACTOR_RECEIVE,
681 HOOK_WORKER_RECEIVE,
682 HOOK_REACTOR_CLOSE,
683 HOOK_WORKER_CLOSE,
684 HOOK_MANAGER_START,
685 HOOK_MANAGER_TIMER,
686 HOOK_PROCESS_TIMER,
687 HOOK_END = SW_MAX_HOOK_TYPE - 1,
688 };
689
690 enum CloseFlag {
691 CLOSE_RESET = 1u << 1,
692 CLOSE_ACTIVELY = 1u << 2,
693 };
694
695 /**
696 * reactor thread/process num
697 */
698 uint16_t reactor_num = 0;
699 /**
700 * worker process num
701 */
702 uint32_t worker_num = 0;
703
704 uint8_t dgram_port_num = 0;
705
706 /**
707 * package dispatch mode
708 */
709 uint8_t dispatch_mode = DISPATCH_FDMOD;
710
711 /**
712 * No idle work process is available.
713 */
714 bool scheduler_warning = false;
715
716 int worker_uid = 0;
717 int worker_groupid = 0;
718
719 /**
720 * worker process max request
721 */
722 uint32_t max_request = 0;
723 uint32_t max_request_grace = 0;
724
725 network::Socket *udp_socket_ipv4 = nullptr;
726 network::Socket *udp_socket_ipv6 = nullptr;
727 network::Socket *dgram_socket = nullptr;
728 int null_fd = -1;
729
730 uint32_t max_wait_time = SW_WORKER_MAX_WAIT_TIME;
731
732 /*----------------------------Reactor schedule--------------------------------*/
733 sw_atomic_t worker_round_id = 0;
734
735 /**
736 * worker(worker and task_worker) process chroot / user / group
737 */
738 std::string chroot_;
739 std::string user_;
740 std::string group_;
741
742 /**
743 * run as a daemon process
744 */
745 bool daemonize = false;
746 /**
747 * have dgram socket
748 */
749 bool have_dgram_sock = false;
750 /**
751 * have stream socket
752 */
753 bool have_stream_sock = false;
754 /**
755 * open cpu affinity setting
756 */
757 bool open_cpu_affinity = false;
758 /**
759 * disable notice when use SW_DISPATCH_ROUND and SW_DISPATCH_QUEUE
760 */
761 bool disable_notify = false;
762 /**
763 * discard the timeout request
764 */
765 bool discard_timeout_request = false;
766 /**
767 * parse cookie header
768 */
769 bool http_parse_cookie = true;
770 /**
771 * parse x-www-form-urlencoded data
772 */
773 bool http_parse_post = true;
774 /**
775 * parse multipart/form-data files to match $_FILES
776 */
777 bool http_parse_files = false;
778 /**
779 * http content compression
780 */
781 bool http_compression = false;
782 /**
783 * RFC-7692
784 */
785 bool websocket_compression = false;
786 /**
787 * handle static files
788 */
789 bool enable_static_handler = false;
790 /**
791 * show file list in the current directory
792 */
793 bool http_autoindex = false;
794 /**
795 * enable onConnect/onClose event when use dispatch_mode=1/3
796 */
797 bool enable_unsafe_event = false;
798 /**
799 * waiting for worker onConnect callback function to return
800 */
801 bool enable_delay_receive = false;
802 /**
803 * reuse port
804 */
805 bool enable_reuse_port = false;
806 /**
807 * asynchronous reloading
808 */
809 bool reload_async = true;
810 /**
811 * use event object
812 */
813 bool event_object = false;
814 /**
815 * use task object
816 */
817 bool task_object = false;
818 /**
819 * enable coroutine in task worker
820 */
821 bool task_enable_coroutine = false;
822 /**
823 * yield coroutine when the output buffer is full
824 */
825 bool send_yield = true;
826 /**
827 * enable coroutine
828 */
829 bool enable_coroutine = true;
830 /**
831 * disable multi-threads
832 */
833 bool single_thread = false;
834 /**
835 * server status
836 */
837 bool running = true;
838
839 int *cpu_affinity_available = 0;
840 int cpu_affinity_available_num = 0;
841
842 UnixSocket *pipe_command = nullptr;
843 MessageBus message_bus;
844
845 double send_timeout = 0;
846
847 uint16_t heartbeat_check_interval = 0;
848
849 time_t reload_time = 0;
850 time_t warning_time = 0;
851 long timezone_ = 0;
852 TimerNode *master_timer = nullptr;
853 TimerNode *heartbeat_timer = nullptr;
854
855 /* buffer output/input setting*/
856 uint32_t output_buffer_size = UINT_MAX;
857 uint32_t input_buffer_size = SW_INPUT_BUFFER_SIZE;
858 uint32_t max_queued_bytes = 0;
859
860 /**
861 * the master process and worker process communicate using unix socket dgram.
862 * ipc_max_size represents the maximum size of each datagram,
863 * which is obtained from the kernel send buffer of unix socket in swServer_set_ipc_max_size function.
864 */
865 uint32_t ipc_max_size = SW_IPC_MAX_SIZE;
866
867 void *private_data_1 = nullptr;
868 void *private_data_2 = nullptr;
869 void *private_data_3 = nullptr;
870
871 Factory *factory = nullptr;
872 Manager *manager = nullptr;
873
874 std::vector<ListenPort *> ports;
875
get_primary_port()876 inline ListenPort *get_primary_port() {
877 return ports.front();
878 }
879
get_port(int _port)880 inline ListenPort *get_port(int _port) {
881 for (auto port : ports) {
882 if (port->port == _port || _port == 0) {
883 return port;
884 }
885 }
886 return nullptr;
887 }
888
get_port_by_server_fd(int server_fd)889 inline ListenPort *get_port_by_server_fd(int server_fd) {
890 return (ListenPort *) connection_list[server_fd].object;
891 }
892
get_port_by_fd(int fd)893 inline ListenPort *get_port_by_fd(int fd) {
894 return get_port_by_server_fd(connection_list[fd].server_fd);
895 }
896
get_port_by_session_id(SessionId session_id)897 inline ListenPort *get_port_by_session_id(SessionId session_id) {
898 Connection *conn = get_connection_by_session_id(session_id);
899 if (!conn) {
900 return nullptr;
901 }
902 return get_port_by_fd(conn->fd);
903 }
904
get_server_socket(int fd)905 inline network::Socket *get_server_socket(int fd) {
906 return connection_list[fd].socket;
907 }
908
909 /**
910 * [ReactorThread]
911 */
get_worker_pipe_socket(Worker * worker)912 network::Socket *get_worker_pipe_socket(Worker *worker) {
913 return &get_thread(SwooleTG.id)->pipe_sockets[worker->pipe_master->fd];
914 }
915
get_command_reply_socket()916 network::Socket *get_command_reply_socket() {
917 return is_base_mode() ? get_worker(0)->pipe_master : pipe_command->get_socket(false);
918 }
919
920 /**
921 * [Worker|Master]
922 */
get_reactor_pipe_socket(SessionId session_id,int reactor_id)923 inline network::Socket *get_reactor_pipe_socket(SessionId session_id, int reactor_id) {
924 int pipe_index = session_id % reactor_pipe_num;
925 /**
926 * pipe_worker_id: The pipe in which worker.
927 */
928 int pipe_worker_id = reactor_id + (pipe_index * reactor_num);
929 Worker *worker = get_worker(pipe_worker_id);
930 return worker->pipe_worker;
931 }
932
933 /**
934 * task process
935 */
936 uint32_t task_worker_num = 0;
937 uint8_t task_ipc_mode = TASK_IPC_UNIXSOCK;
938 uint32_t task_max_request = 0;
939 uint32_t task_max_request_grace = 0;
940 std::vector<std::shared_ptr<Pipe>> task_notify_pipes;
941 EventData *task_result = nullptr;
942
943 /**
944 * user process
945 */
946 std::vector<Worker *> user_worker_list;
947 std::unordered_map<pid_t, Worker *> user_worker_map;
948 Worker *user_workers = nullptr;
949
950 std::unordered_map<std::string, Command> commands;
951 std::unordered_map<int, Command::Handler> command_handlers;
952 std::unordered_map<int64_t, Command::Callback> command_callbacks;
953 int command_current_id = 1;
954 int64_t command_current_request_id = 1;
955
956 Worker *workers = nullptr;
957 ServerGS *gs = nullptr;
958
959 std::unordered_set<std::string> *types = nullptr;
960 std::unordered_set<std::string> *locations = nullptr;
961 std::vector<std::string> *http_index_files = nullptr;
962
963 #ifdef HAVE_PTHREAD_BARRIER
964 pthread_barrier_t reactor_thread_barrier = {};
965 #endif
966
967 /**
968 * temporary directory for HTTP uploaded file.
969 */
970 std::string upload_tmp_dir = "/tmp";
971 /**
972 * http compression level for gzip/br
973 */
974 #ifdef SW_HAVE_COMPRESSION
975 uint8_t http_compression_level = 0;
976 uint32_t compression_min_length;
977 #endif
978 /**
979 * master process pid
980 */
981 std::string pid_file;
982 /**
983 * stream
984 */
985 char *stream_socket_file = nullptr;
986 network::Socket *stream_socket = nullptr;
987 Protocol stream_protocol = {};
988 network::Socket *last_stream_socket = nullptr;
989 EventData *last_task = nullptr;
990 std::queue<String *> *buffer_pool = nullptr;
991
992 const Allocator *recv_buffer_allocator = &SwooleG.std_allocator;
993 size_t recv_buffer_size = SW_BUFFER_SIZE_BIG;
994
995 int manager_alarm = 0;
996
997 /**
998 * message queue key
999 */
1000 uint64_t message_queue_key = 0;
1001
1002 void *hooks[SW_MAX_HOOK_TYPE] = {};
1003
1004 /*----------------------------Event Callback--------------------------------*/
1005 /**
1006 * Master Process
1007 */
1008 std::function<void(Server *)> onStart;
1009 std::function<void(Server *)> onBeforeShutdown;
1010 std::function<void(Server *)> onShutdown;
1011 /**
1012 * Manager Process
1013 */
1014 std::function<void(Server *)> onManagerStart;
1015 std::function<void(Server *)> onManagerStop;
1016 std::function<void(Server *, int, const ExitStatus &)> onWorkerError;
1017 std::function<void(Server *)> onBeforeReload;
1018 std::function<void(Server *)> onAfterReload;
1019 /**
1020 * Worker Process
1021 */
1022 std::function<void(Server *, EventData *)> onPipeMessage;
1023 std::function<void(Server *, uint32_t)> onWorkerStart;
1024 std::function<void(Server *, uint32_t)> onWorkerStop;
1025 std::function<void(Server *, uint32_t)> onWorkerExit;
1026 std::function<void(Server *, Worker *)> onUserWorkerStart;
1027 /**
1028 * Connection
1029 */
1030 std::function<int(Server *, RecvData *)> onReceive;
1031 std::function<int(Server *, RecvData *)> onPacket;
1032 std::function<void(Server *, DataHead *)> onClose;
1033 std::function<void(Server *, DataHead *)> onConnect;
1034 std::function<void(Server *, DataHead *)> onBufferFull;
1035 std::function<void(Server *, DataHead *)> onBufferEmpty;
1036 /**
1037 * Task Worker
1038 */
1039 std::function<int(Server *, EventData *)> onTask;
1040 std::function<int(Server *, EventData *)> onFinish;
1041
1042 /**
1043 * Hook
1044 */
1045 int (*dispatch_func)(Server *, Connection *, SendData *) = nullptr;
1046
1047 public:
1048 Server(enum Mode _mode = MODE_BASE);
1049 ~Server();
1050
set_document_root(const std::string & path)1051 bool set_document_root(const std::string &path) {
1052 if (path.length() > PATH_MAX) {
1053 swoole_warning("The length of document_root must be less than %d", PATH_MAX);
1054 return false;
1055 }
1056
1057 char _realpath[PATH_MAX];
1058 if (!realpath(path.c_str(), _realpath)) {
1059 swoole_warning("document_root[%s] does not exist", path.c_str());
1060 return false;
1061 }
1062
1063 document_root = std::string(_realpath);
1064 return true;
1065 }
1066
1067 void add_static_handler_location(const std::string &);
1068 void add_static_handler_index_files(const std::string &);
1069 bool select_static_handler(http_server::Request *request, Connection *conn);
1070
1071 int create();
1072 int start();
1073 void shutdown();
1074
1075 int add_worker(Worker *worker);
1076 ListenPort *add_port(SocketType type, const char *host, int port);
1077 int add_systemd_socket();
1078 int add_hook(enum HookType type, const Callback &func, int push_back);
1079 bool add_command(const std::string &command, int accepted_process_types, const Command::Handler &func);
1080 Connection *add_connection(ListenPort *ls, network::Socket *_socket, int server_fd);
1081 void abort_connection(Reactor *reactor, ListenPort *ls, network::Socket *_socket);
1082 int connection_incoming(Reactor *reactor, Connection *conn);
1083
1084 int get_idle_worker_num();
1085 int get_idle_task_worker_num();
1086 int get_task_count();
1087
get_minfd()1088 inline int get_minfd() {
1089 return gs->min_fd;
1090 }
1091
get_maxfd()1092 inline int get_maxfd() {
1093 return gs->max_fd;
1094 }
1095
set_maxfd(int maxfd)1096 inline void set_maxfd(int maxfd) {
1097 gs->max_fd = maxfd;
1098 }
1099
set_minfd(int minfd)1100 inline void set_minfd(int minfd) {
1101 gs->min_fd = minfd;
1102 }
1103
1104 void store_listen_socket();
1105 void store_pipe_fd(UnixSocket *p);
1106
get_document_root()1107 inline const std::string &get_document_root() {
1108 return document_root;
1109 }
1110
get_recv_buffer(swSocket * _socket)1111 inline String *get_recv_buffer(swSocket *_socket) {
1112 String *buffer = _socket->recv_buffer;
1113 if (buffer == nullptr) {
1114 buffer = swoole::make_string(SW_BUFFER_SIZE_BIG, recv_buffer_allocator);
1115 if (!buffer) {
1116 return nullptr;
1117 }
1118 _socket->recv_buffer = buffer;
1119 }
1120
1121 return buffer;
1122 }
1123
get_worker_buffer_num()1124 inline uint32_t get_worker_buffer_num() {
1125 return is_base_mode() ? 1 : reactor_num + dgram_port_num;
1126 }
1127
is_support_unsafe_events()1128 inline bool is_support_unsafe_events() {
1129 if (dispatch_mode != DISPATCH_ROUND && dispatch_mode != DISPATCH_IDLE_WORKER &&
1130 dispatch_mode != DISPATCH_STREAM) {
1131 return true;
1132 } else {
1133 return enable_unsafe_event;
1134 }
1135 }
1136
is_process_mode()1137 inline bool is_process_mode() {
1138 return mode_ == MODE_PROCESS;
1139 }
1140
is_base_mode()1141 inline bool is_base_mode() {
1142 return mode_ == MODE_BASE;
1143 }
1144
is_enable_coroutine()1145 inline bool is_enable_coroutine() {
1146 if (is_task_worker()) {
1147 return task_enable_coroutine;
1148 } else if (is_manager()) {
1149 return false;
1150 } else {
1151 return enable_coroutine;
1152 }
1153 }
1154
is_hash_dispatch_mode()1155 inline bool is_hash_dispatch_mode() {
1156 return dispatch_mode == DISPATCH_FDMOD || dispatch_mode == DISPATCH_IPMOD ||
1157 dispatch_mode == DISPATCH_CO_CONN_LB;
1158 }
1159
is_support_send_yield()1160 inline bool is_support_send_yield() {
1161 return is_hash_dispatch_mode();
1162 }
1163
if_require_packet_callback(ListenPort * port,bool isset)1164 inline bool if_require_packet_callback(ListenPort *port, bool isset) {
1165 #ifdef SW_USE_OPENSSL
1166 return (port->is_dgram() && !port->ssl && !isset);
1167 #else
1168 return (port->is_dgram() && !isset);
1169 #endif
1170 }
1171
if_require_receive_callback(ListenPort * port,bool isset)1172 inline bool if_require_receive_callback(ListenPort *port, bool isset) {
1173 #ifdef SW_USE_OPENSSL
1174 return (((port->is_dgram() && port->ssl) || port->is_stream()) && !isset);
1175 #else
1176 return (port->is_stream() && !isset);
1177 #endif
1178 }
1179
get_worker(uint16_t worker_id)1180 inline Worker *get_worker(uint16_t worker_id) {
1181 // Event Worker
1182 if (worker_id < worker_num) {
1183 return &(gs->event_workers.workers[worker_id]);
1184 }
1185
1186 // Task Worker
1187 uint32_t task_worker_max = task_worker_num + worker_num;
1188 if (worker_id < task_worker_max) {
1189 return &(gs->task_workers.workers[worker_id - worker_num]);
1190 }
1191
1192 // User Worker
1193 uint32_t user_worker_max = task_worker_max + user_worker_list.size();
1194 if (worker_id < user_worker_max) {
1195 return &(user_workers[worker_id - task_worker_max]);
1196 }
1197
1198 return nullptr;
1199 }
1200
get_lowest_load_worker_id()1201 int get_lowest_load_worker_id() {
1202 uint32_t lowest_load_worker_id = 0;
1203 size_t min_coroutine = workers[0].coroutine_num;
1204 for (uint32_t i = 1; i < worker_num; i++) {
1205 if (workers[i].coroutine_num < min_coroutine) {
1206 min_coroutine = workers[i].coroutine_num;
1207 lowest_load_worker_id = i;
1208 continue;
1209 }
1210 }
1211 return lowest_load_worker_id;
1212 }
1213
1214 void stop_async_worker(Worker *worker);
1215
get_pipe_object(int pipe_fd)1216 inline Pipe *get_pipe_object(int pipe_fd) {
1217 return (Pipe *) connection_list[pipe_fd].object;
1218 }
1219
get_all_worker_num()1220 size_t get_all_worker_num() {
1221 return worker_num + task_worker_num + get_user_worker_num();
1222 }
1223
get_user_worker_num()1224 size_t get_user_worker_num() {
1225 return user_worker_list.size();
1226 }
1227
get_thread(int reactor_id)1228 inline ReactorThread *get_thread(int reactor_id) {
1229 return &reactor_threads[reactor_id];
1230 }
1231
is_started()1232 inline bool is_started() {
1233 return gs->start;
1234 }
1235
is_created()1236 bool is_created() {
1237 return factory != nullptr;
1238 }
1239
is_master()1240 bool is_master() {
1241 return SwooleG.process_type == SW_PROCESS_MASTER;
1242 }
1243
is_worker()1244 bool is_worker() {
1245 return SwooleG.process_type == SW_PROCESS_WORKER;
1246 }
1247
is_task_worker()1248 bool is_task_worker() {
1249 return SwooleG.process_type == SW_PROCESS_TASKWORKER;
1250 }
1251
is_manager()1252 bool is_manager() {
1253 return SwooleG.process_type == SW_PROCESS_MANAGER;
1254 }
1255
is_user_worker()1256 bool is_user_worker() {
1257 return SwooleG.process_type == SW_PROCESS_USERWORKER;
1258 }
1259
is_reactor_thread()1260 bool is_reactor_thread() {
1261 return SwooleG.process_type == SW_PROCESS_MASTER && SwooleTG.type == Server::THREAD_REACTOR;
1262 }
1263
isset_hook(enum HookType type)1264 bool isset_hook(enum HookType type) {
1265 assert(type <= HOOK_END);
1266 return hooks[type];
1267 }
1268
is_sync_process()1269 bool is_sync_process() {
1270 if (is_manager()) {
1271 return true;
1272 }
1273 if (is_task_worker() && !task_enable_coroutine) {
1274 return true;
1275 }
1276 return false;
1277 }
is_shutdown()1278 inline bool is_shutdown() {
1279 return gs->shutdown;
1280 }
1281
1282 // can only be used in the main process
is_valid_connection(Connection * conn)1283 inline bool is_valid_connection(Connection *conn) {
1284 return (conn && conn->socket && conn->active && conn->socket->fd_type == SW_FD_SESSION);
1285 }
1286
1287 bool is_healthy_connection(double now, Connection *conn);
1288
is_dgram_event(uint8_t type)1289 static int is_dgram_event(uint8_t type) {
1290 switch (type) {
1291 case SW_SERVER_EVENT_RECV_DGRAM:
1292 return true;
1293 default:
1294 return false;
1295 }
1296 }
1297
is_stream_event(uint8_t type)1298 static int is_stream_event(uint8_t type) {
1299 switch (type) {
1300 case SW_SERVER_EVENT_RECV_DATA:
1301 case SW_SERVER_EVENT_SEND_DATA:
1302 case SW_SERVER_EVENT_SEND_FILE:
1303 case SW_SERVER_EVENT_CONNECT:
1304 case SW_SERVER_EVENT_CLOSE:
1305 case SW_SERVER_EVENT_PAUSE_RECV:
1306 case SW_SERVER_EVENT_RESUME_RECV:
1307 case SW_SERVER_EVENT_BUFFER_FULL:
1308 case SW_SERVER_EVENT_BUFFER_EMPTY:
1309 return true;
1310 default:
1311 return false;
1312 }
1313 }
1314
get_connection_fd(SessionId session_id)1315 inline int get_connection_fd(SessionId session_id) {
1316 return session_list[session_id % SW_SESSION_LIST_SIZE].fd;
1317 }
1318
get_connection_verify_no_ssl(SessionId session_id)1319 inline Connection *get_connection_verify_no_ssl(SessionId session_id) {
1320 Session *session = get_session(session_id);
1321 int fd = session->fd;
1322 Connection *conn = get_connection(fd);
1323 if (!conn || conn->active == 0) {
1324 return nullptr;
1325 }
1326 if (session->id != session_id || conn->session_id != session_id) {
1327 return nullptr;
1328 }
1329 return conn;
1330 }
1331
get_connection_verify(SessionId session_id)1332 inline Connection *get_connection_verify(SessionId session_id) {
1333 Connection *conn = get_connection_verify_no_ssl(session_id);
1334 #ifdef SW_USE_OPENSSL
1335 if (conn && conn->ssl && !conn->ssl_ready) {
1336 return nullptr;
1337 }
1338 #endif
1339 return conn;
1340 }
1341
get_connection(int fd)1342 inline Connection *get_connection(int fd) {
1343 if ((uint32_t) fd > max_connection) {
1344 return nullptr;
1345 }
1346 return &connection_list[fd];
1347 }
1348
get_connection_for_iterator(int fd)1349 inline Connection *get_connection_for_iterator(int fd) {
1350 Connection *conn = get_connection(fd);
1351 if (conn && conn->active && !conn->closed) {
1352 #ifdef SW_USE_OPENSSL
1353 if (conn->ssl && !conn->ssl_ready) {
1354 return nullptr;
1355 }
1356 #endif
1357 return conn;
1358 }
1359 return nullptr;
1360 }
1361
get_connection_by_session_id(SessionId session_id)1362 inline Connection *get_connection_by_session_id(SessionId session_id) {
1363 return get_connection(get_connection_fd(session_id));
1364 }
1365
get_session(SessionId session_id)1366 inline Session *get_session(SessionId session_id) {
1367 return &session_list[session_id % SW_SESSION_LIST_SIZE];
1368 }
1369
lock()1370 inline void lock() {
1371 lock_.lock();
1372 }
1373
unlock()1374 inline void unlock() {
1375 lock_.unlock();
1376 }
1377
1378 void close_port(bool only_stream_port);
1379 void clear_timer();
1380 static void timer_callback(Timer *timer, TimerNode *tnode);
1381
1382 int create_task_workers();
1383 int create_user_workers();
1384 int start_manager_process();
1385
1386 void call_hook(enum HookType type, void *arg);
1387 void call_worker_start_callback(Worker *worker);
1388 ResultCode call_command_handler(MessageBus &mb, uint16_t worker_id, network::Socket *sock);
1389 std::string call_command_handler_in_master(int command_id, const std::string &msg);
1390 void call_command_callback(int64_t request_id, const std::string &result);
1391 void foreach_connection(const std::function<void(Connection *)> &callback);
1392 static int accept_connection(Reactor *reactor, Event *event);
1393 #ifdef SW_SUPPORT_DTLS
1394 dtls::Session *accept_dtls_connection(ListenPort *ls, network::Address *sa);
1395 #endif
1396 static int accept_command_result(Reactor *reactor, Event *event);
1397 static int close_connection(Reactor *reactor, network::Socket *_socket);
1398 static int dispatch_task(const Protocol *proto, network::Socket *_socket, const RecvData *rdata);
1399
1400 int send_to_connection(SendData *);
1401 ssize_t send_to_worker_from_worker(Worker *dst_worker, const void *buf, size_t len, int flags);
1402
send_to_worker_from_worker(WorkerId id,EventData * data,int flags)1403 ssize_t send_to_worker_from_worker(WorkerId id, EventData *data, int flags) {
1404 return send_to_worker_from_worker(get_worker(id), data, sizeof(data->info) + data->info.len, flags);
1405 }
1406
1407 ssize_t send_to_reactor_thread(const EventData *ev_data, size_t sendn, SessionId session_id);
1408 int reply_task_result(const char *data, size_t data_len, int flags, EventData *current_task);
1409
1410 bool send(SessionId session_id, const void *data, uint32_t length);
1411 bool sendfile(SessionId session_id, const char *file, uint32_t l_file, off_t offset, size_t length);
1412 bool sendwait(SessionId session_id, const void *data, uint32_t length);
1413 bool close(SessionId session_id, bool reset);
1414
1415 bool notify(Connection *conn, enum ServerEventType event);
1416 bool feedback(Connection *conn, enum ServerEventType event);
1417 bool command(WorkerId process_id,
1418 Command::ProcessType process_type,
1419 const std::string &name,
1420 const std::string &msg,
1421 const Command::Callback &fn);
1422
1423 void init_reactor(Reactor *reactor);
1424 void init_worker(Worker *worker);
1425 void init_task_workers();
1426 void init_port_protocol(ListenPort *port);
1427 void init_signal_handler();
1428 void init_ipc_max_size();
1429
1430 void set_max_connection(uint32_t _max_connection);
1431
get_max_connection()1432 inline uint32_t get_max_connection() {
1433 return max_connection;
1434 }
1435
set_start_session_id(SessionId value)1436 void set_start_session_id(SessionId value) {
1437 if (value > UINT_MAX) {
1438 value = UINT_MAX;
1439 }
1440 gs->session_round = value;
1441 }
1442
1443 int create_pipe_buffers();
1444 void release_pipe_buffers();
1445 void create_worker(Worker *worker);
1446 void destroy_worker(Worker *worker);
1447 void disable_accept();
1448 void destroy_http_request(Connection *conn);
1449
1450 int schedule_worker(int fd, SendData *data);
1451
1452 /**
1453 * [Manager]
1454 */
1455 pid_t spawn_event_worker(Worker *worker);
1456 pid_t spawn_user_worker(Worker *worker);
1457 pid_t spawn_task_worker(Worker *worker);
1458
1459 void kill_user_workers();
1460 void kill_event_workers();
1461 void kill_task_workers();
1462
1463 static int wait_other_worker(ProcessPool *pool, const ExitStatus &exit_status);
1464 static void read_worker_message(ProcessPool *pool, EventData *msg);
1465
1466 void drain_worker_pipe();
1467
1468 void check_worker_exit_status(int worker_id, const ExitStatus &exit_status);
1469
1470 /**
1471 * [Worker]
1472 */
1473 void worker_start_callback();
1474 void worker_stop_callback();
1475 void worker_accept_event(DataHead *info);
1476 static void worker_signal_handler(int signo);
1477 static void worker_signal_init(void);
1478 static bool task_pack(EventData *task, const void *data, size_t data_len);
1479 static bool task_unpack(EventData *task, String *buffer, PacketPtr *packet);
1480
1481 private:
1482 enum Mode mode_;
1483 Connection *connection_list = nullptr;
1484 Session *session_list = nullptr;
1485 ServerPortGS *port_gs_list = nullptr;
1486 /**
1487 * http static file directory
1488 */
1489 std::string document_root;
1490 std::mutex lock_;
1491 uint32_t max_connection = 0;
1492 TimerNode *enable_accept_timer = nullptr;
1493 std::thread heartbeat_thread;
1494 /**
1495 * The number of pipe per reactor maintenance
1496 */
1497 uint16_t reactor_pipe_num = 0;
1498 ReactorThread *reactor_threads = nullptr;
1499
1500 int start_check();
1501 void check_port_type(ListenPort *ls);
1502 void destroy();
1503 void destroy_reactor_threads();
1504 void destroy_reactor_processes();
1505 int create_reactor_processes();
1506 int create_reactor_threads();
1507 int start_reactor_threads();
1508 int start_reactor_processes();
1509 int start_master_thread();
1510 int start_event_worker(Worker *worker);
1511 void start_heartbeat_thread();
1512 void join_reactor_thread();
1513 TimerCallback get_timeout_callback(ListenPort *port, Reactor *reactor, Connection *conn);
1514 };
1515
1516 } // namespace swoole
1517
1518 typedef swoole::Server swServer;
1519 typedef swoole::ListenPort swListenPort;
1520 typedef swoole::RecvData swRecvData;
1521
1522 extern swoole::Server *g_server_instance;
1523
sw_server()1524 static inline swoole::Server *sw_server() {
1525 return g_server_instance;
1526 }
1527