1 /*
2 +----------------------------------------------------------------------+
3 | Swoole |
4 +----------------------------------------------------------------------+
5 | This source file is subject to version 2.0 of the Apache license, |
6 | that is bundled with this package in the file LICENSE, and is |
7 | available through the world-wide-web at the following url: |
8 | http://www.apache.org/licenses/LICENSE-2.0.html |
9 | If you did not receive a copy of the Apache2.0 license and are unable|
10 | to obtain it through the world-wide-web, please send a note to |
11 | license@swoole.com so we can mail you a copy immediately. |
12 +----------------------------------------------------------------------+
13 | Author: Tianfeng Han <mikan.tenny@gmail.com> |
14 +----------------------------------------------------------------------+
15 */
16
17 #include <pwd.h>
18 #include <grp.h>
19 #include <sys/uio.h>
20 #include <sys/mman.h>
21
22 #include "swoole_server.h"
23 #include "swoole_memory.h"
24 #include "swoole_msg_queue.h"
25 #include "swoole_client.h"
26 #include "swoole_coroutine.h"
27
28 swoole::WorkerGlobal SwooleWG = {};
29
30 namespace swoole {
31 using namespace network;
32
33 static int Worker_onPipeReceive(Reactor *reactor, Event *event);
34 static int Worker_onStreamAccept(Reactor *reactor, Event *event);
35 static int Worker_onStreamRead(Reactor *reactor, Event *event);
36 static int Worker_onStreamPackage(const Protocol *proto, Socket *sock, const RecvData *rdata);
37 static int Worker_onStreamClose(Reactor *reactor, Event *event);
38 static void Worker_reactor_try_to_exit(Reactor *reactor);
39
worker_signal_init(void)40 void Server::worker_signal_init(void) {
41 /**
42 * use user settings
43 */
44 SwooleG.use_signalfd = SwooleG.enable_signalfd;
45
46 swoole_signal_set(SIGHUP, nullptr);
47 swoole_signal_set(SIGPIPE, SIG_IGN);
48 swoole_signal_set(SIGUSR1, nullptr);
49 swoole_signal_set(SIGUSR2, nullptr);
50 // swSignal_set(SIGINT, Server::worker_signal_handler);
51 swoole_signal_set(SIGTERM, Server::worker_signal_handler);
52 // for test
53 swoole_signal_set(SIGVTALRM, Server::worker_signal_handler);
54 #ifdef SIGRTMIN
55 swoole_signal_set(SIGRTMIN, Server::worker_signal_handler);
56 #endif
57 }
58
worker_signal_handler(int signo)59 void Server::worker_signal_handler(int signo) {
60 if (!SwooleG.running or !sw_server()) {
61 return;
62 }
63 switch (signo) {
64 case SIGTERM:
65 // Event worker
66 if (swoole_event_is_available()) {
67 sw_server()->stop_async_worker(SwooleWG.worker);
68 }
69 // Task worker
70 else {
71 SwooleWG.shutdown = true;
72 }
73 break;
74 // for test
75 case SIGVTALRM:
76 swoole_warning("SIGVTALRM coming");
77 break;
78 case SIGUSR1:
79 case SIGUSR2:
80 if (sw_logger()) {
81 sw_logger()->reopen();
82 }
83 break;
84 default:
85 #ifdef SIGRTMIN
86 if (signo == SIGRTMIN && sw_logger()) {
87 sw_logger()->reopen();
88 }
89 #endif
90 break;
91 }
92 }
93
Worker_discard_data(Server * serv,Connection * conn,DataHead * info)94 static sw_inline bool Worker_discard_data(Server *serv, Connection *conn, DataHead *info) {
95 if (conn == nullptr) {
96 if (serv->disable_notify && !serv->discard_timeout_request) {
97 return false;
98 }
99 goto _discard_data;
100 } else {
101 if (conn->closed) {
102 goto _discard_data;
103 } else {
104 return false;
105 }
106 }
107 _discard_data:
108 swoole_error_log(SW_LOG_WARNING,
109 SW_ERROR_SESSION_DISCARD_TIMEOUT_DATA,
110 "[2] ignore data[%u bytes] received from session#%ld",
111 info->len,
112 info->fd);
113 return true;
114 }
115
Worker_onStreamAccept(Reactor * reactor,Event * event)116 static int Worker_onStreamAccept(Reactor *reactor, Event *event) {
117 Socket *sock = event->socket->accept();
118 if (sock == nullptr) {
119 switch (errno) {
120 case EINTR:
121 case EAGAIN:
122 return SW_OK;
123 default:
124 swoole_sys_warning("accept() failed");
125 return SW_OK;
126 }
127 }
128
129 sock->fd_type = SW_FD_STREAM;
130 sock->socket_type = SW_SOCK_UNIX_STREAM;
131
132 return reactor->add(sock, SW_EVENT_READ);
133 }
134
Worker_onStreamRead(Reactor * reactor,Event * event)135 static int Worker_onStreamRead(Reactor *reactor, Event *event) {
136 Socket *conn = event->socket;
137 Server *serv = (Server *) reactor->ptr;
138 Protocol *protocol = &serv->stream_protocol;
139 String *buffer;
140
141 if (!event->socket->recv_buffer) {
142 if (serv->buffer_pool->empty()) {
143 buffer = new String(SW_BUFFER_SIZE_STD);
144 } else {
145 buffer = serv->buffer_pool->front();
146 serv->buffer_pool->pop();
147 }
148 event->socket->recv_buffer = buffer;
149 } else {
150 buffer = event->socket->recv_buffer;
151 }
152
153 if (protocol->recv_with_length_protocol(conn, buffer) < 0) {
154 Worker_onStreamClose(reactor, event);
155 }
156
157 return SW_OK;
158 }
159
Worker_onStreamClose(Reactor * reactor,Event * event)160 static int Worker_onStreamClose(Reactor *reactor, Event *event) {
161 Socket *sock = event->socket;
162 Server *serv = (Server *) reactor->ptr;
163
164 sock->recv_buffer->clear();
165 serv->buffer_pool->push(sock->recv_buffer);
166 sock->recv_buffer = nullptr;
167
168 reactor->del(sock);
169 reactor->close(reactor, sock);
170
171 if (serv->last_stream_socket == sock) {
172 serv->last_stream_socket = nullptr;
173 }
174
175 return SW_OK;
176 }
177
Worker_onStreamPackage(const Protocol * proto,Socket * sock,const RecvData * rdata)178 static int Worker_onStreamPackage(const Protocol *proto, Socket *sock, const RecvData *rdata) {
179 Server *serv = (Server *) proto->private_data_2;
180
181 SendData task{};
182 memcpy(&task.info, rdata->data + proto->package_length_size, sizeof(task.info));
183 task.info.len = rdata->info.len - (uint32_t) sizeof(task.info) - proto->package_length_size;
184 if (task.info.len > 0) {
185 task.data = (char *) (rdata->data + proto->package_length_size + sizeof(task.info));
186 }
187
188 serv->last_stream_socket = sock;
189 serv->message_bus.pass(&task);
190 serv->worker_accept_event(&serv->message_bus.get_buffer()->info);
191 serv->last_stream_socket = nullptr;
192
193 int _end = 0;
194 swoole_event_write(sock, (void *) &_end, sizeof(_end));
195
196 return SW_OK;
197 }
198
199 typedef std::function<int(Server *, RecvData *)> TaskCallback;
200
Worker_do_task(Server * serv,Worker * worker,DataHead * info,const TaskCallback & callback)201 static sw_inline void Worker_do_task(Server *serv, Worker *worker, DataHead *info, const TaskCallback &callback) {
202 RecvData recv_data;
203 auto packet = serv->message_bus.get_packet();
204 recv_data.info = *info;
205 recv_data.info.len = packet.length;
206 recv_data.data = packet.data;
207
208 if (callback(serv, &recv_data) == SW_OK) {
209 worker->request_count++;
210 sw_atomic_fetch_add(&serv->gs->request_count, 1);
211 }
212 }
213
worker_accept_event(DataHead * info)214 void Server::worker_accept_event(DataHead *info) {
215 Worker *worker = SwooleWG.worker;
216 // worker busy
217 worker->status = SW_WORKER_BUSY;
218
219 switch (info->type) {
220 case SW_SERVER_EVENT_RECV_DATA: {
221 Connection *conn = get_connection_verify(info->fd);
222 if (conn) {
223 if (info->len > 0) {
224 auto packet = message_bus.get_packet();
225 sw_atomic_fetch_sub(&conn->recv_queued_bytes, packet.length);
226 swoole_trace_log(SW_TRACE_SERVER,
227 "[Worker] session_id=%ld, len=%lu, qb=%d",
228 conn->session_id,
229 packet.length,
230 conn->recv_queued_bytes);
231 }
232 conn->last_dispatch_time = info->time;
233 }
234 if (!Worker_discard_data(this, conn, info)) {
235 Worker_do_task(this, worker, info, onReceive);
236 }
237 break;
238 }
239 case SW_SERVER_EVENT_RECV_DGRAM: {
240 Worker_do_task(this, worker, info, onPacket);
241 break;
242 }
243 case SW_SERVER_EVENT_CLOSE: {
244 #ifdef SW_USE_OPENSSL
245 Connection *conn = get_connection_verify_no_ssl(info->fd);
246 if (conn && conn->ssl_client_cert && conn->ssl_client_cert_pid == SwooleG.pid) {
247 delete conn->ssl_client_cert;
248 conn->ssl_client_cert = nullptr;
249 }
250 #endif
251 factory->end(info->fd, false);
252 break;
253 }
254 case SW_SERVER_EVENT_CONNECT: {
255 #ifdef SW_USE_OPENSSL
256 // SSL client certificate
257 if (info->len > 0) {
258 Connection *conn = get_connection_verify_no_ssl(info->fd);
259 if (conn) {
260 auto packet = message_bus.get_packet();
261 conn->ssl_client_cert = new String(packet.data, packet.length);
262 conn->ssl_client_cert_pid = SwooleG.pid;
263 }
264 }
265 #endif
266 if (onConnect) {
267 onConnect(this, info);
268 }
269 break;
270 }
271
272 case SW_SERVER_EVENT_BUFFER_FULL: {
273 if (onBufferFull) {
274 onBufferFull(this, info);
275 }
276 break;
277 }
278 case SW_SERVER_EVENT_BUFFER_EMPTY: {
279 if (onBufferEmpty) {
280 onBufferEmpty(this, info);
281 }
282 break;
283 }
284 case SW_SERVER_EVENT_FINISH: {
285 onFinish(this, (EventData *) message_bus.get_buffer());
286 break;
287 }
288 case SW_SERVER_EVENT_PIPE_MESSAGE: {
289 onPipeMessage(this, (EventData *) message_bus.get_buffer());
290 break;
291 }
292 case SW_SERVER_EVENT_COMMAND_REQUEST: {
293 call_command_handler(message_bus, worker->id, pipe_command->get_socket(false));
294 break;
295 }
296 default:
297 swoole_warning("[Worker] error event[type=%d]", (int) info->type);
298 break;
299 }
300
301 // worker idle
302 worker->status = SW_WORKER_IDLE;
303
304 // maximum number of requests, process will exit.
305 if (!SwooleWG.run_always && worker->request_count >= SwooleWG.max_request) {
306 stop_async_worker(worker);
307 }
308 }
309
worker_start_callback()310 void Server::worker_start_callback() {
311 if (SwooleG.process_id >= worker_num) {
312 SwooleG.process_type = SW_PROCESS_TASKWORKER;
313 } else {
314 SwooleG.process_type = SW_PROCESS_WORKER;
315 }
316
317 int is_root = !geteuid();
318 struct passwd *_passwd = nullptr;
319 struct group *_group = nullptr;
320
321 if (is_root) {
322 // get group info
323 if (!group_.empty()) {
324 _group = getgrnam(group_.c_str());
325 if (!_group) {
326 swoole_warning("get group [%s] info failed", group_.c_str());
327 }
328 }
329 // get user info
330 if (!user_.empty()) {
331 _passwd = getpwnam(user_.c_str());
332 if (!_passwd) {
333 swoole_warning("get user [%s] info failed", user_.c_str());
334 }
335 }
336 // set process group
337 if (_group && setgid(_group->gr_gid) < 0) {
338 swoole_sys_warning("setgid to [%s] failed", group_.c_str());
339 }
340 // set process user
341 if (_passwd && setuid(_passwd->pw_uid) < 0) {
342 swoole_sys_warning("setuid to [%s] failed", user_.c_str());
343 }
344 // chroot
345 if (!chroot_.empty()) {
346 if (::chroot(chroot_.c_str()) == 0) {
347 if (chdir("/") < 0) {
348 swoole_sys_warning("chdir(\"/\") failed");
349 }
350 } else {
351 swoole_sys_warning("chroot(\"%s\") failed", chroot_.c_str());
352 }
353 }
354 }
355
356 SW_LOOP_N(worker_num + task_worker_num) {
357 Worker *worker = get_worker(i);
358 if (SwooleG.process_id == i) {
359 continue;
360 }
361 if (is_worker() && worker->pipe_master) {
362 worker->pipe_master->set_nonblock();
363 }
364 }
365
366 if (sw_logger()->is_opened()) {
367 sw_logger()->reopen();
368 }
369
370 SwooleWG.worker = get_worker(SwooleG.process_id);
371 SwooleWG.worker->status = SW_WORKER_IDLE;
372
373 #ifdef HAVE_SIGNALFD
374 if (SwooleG.use_signalfd && SwooleTG.reactor && SwooleG.signal_fd == 0) {
375 swoole_signalfd_setup(SwooleTG.reactor);
376 }
377 #endif
378
379 if (is_process_mode()) {
380 sw_shm_protect(session_list, PROT_READ);
381 }
382
383 call_worker_start_callback(SwooleWG.worker);
384 }
385
worker_stop_callback()386 void Server::worker_stop_callback() {
387 void *hook_args[2];
388 hook_args[0] = this;
389 hook_args[1] = (void *) (uintptr_t) SwooleG.process_id;
390 if (swoole_isset_hook(SW_GLOBAL_HOOK_BEFORE_WORKER_STOP)) {
391 swoole_call_hook(SW_GLOBAL_HOOK_BEFORE_WORKER_STOP, hook_args);
392 }
393 if (onWorkerStop) {
394 onWorkerStop(this, SwooleG.process_id);
395 }
396 if (!message_bus.empty()) {
397 swoole_error_log(
398 SW_LOG_WARNING, SW_ERROR_SERVER_WORKER_UNPROCESSED_DATA, "unprocessed data in the worker process buffer");
399 message_bus.clear();
400 }
401 }
402
stop_async_worker(Worker * worker)403 void Server::stop_async_worker(Worker *worker) {
404 worker->status = SW_WORKER_EXIT;
405 Reactor *reactor = SwooleTG.reactor;
406
407 /**
408 * force to end.
409 */
410 if (reload_async == 0) {
411 running = false;
412 reactor->running = false;
413 return;
414 }
415
416 // The worker process is shutting down now.
417 if (reactor->wait_exit) {
418 return;
419 }
420
421 // Separated from the event worker process pool
422 worker = (Worker *) sw_malloc(sizeof(*worker));
423 *worker = *SwooleWG.worker;
424 SwooleWG.worker = worker;
425
426 if (stream_socket) {
427 reactor->del(stream_socket);
428 stream_socket->free();
429 stream_socket = nullptr;
430 }
431
432 if (worker->pipe_worker && !worker->pipe_worker->removed) {
433 reactor->remove_read_event(worker->pipe_worker);
434 }
435
436 if (is_base_mode()) {
437 if (is_worker()) {
438 if (worker->id == 0 && gs->event_workers.running == 0) {
439 if (swoole_isset_hook(SW_GLOBAL_HOOK_BEFORE_SERVER_SHUTDOWN)) {
440 swoole_call_hook(SW_GLOBAL_HOOK_BEFORE_SERVER_SHUTDOWN, this);
441 }
442 if (onBeforeShutdown) {
443 onBeforeShutdown(this);
444 }
445 }
446 for (auto ls : ports) {
447 reactor->del(ls->socket);
448 }
449 if (worker->pipe_master && !worker->pipe_master->removed) {
450 reactor->remove_read_event(worker->pipe_master);
451 }
452 foreach_connection([reactor](Connection *conn) {
453 if (!conn->peer_closed && !conn->socket->removed) {
454 reactor->remove_read_event(conn->socket);
455 }
456 });
457 clear_timer();
458 }
459 } else {
460 WorkerStopMessage msg;
461 msg.pid = SwooleG.pid;
462 msg.worker_id = SwooleG.process_id;
463
464 if (gs->event_workers.push_message(SW_WORKER_MESSAGE_STOP, &msg, sizeof(msg)) < 0) {
465 running = 0;
466 }
467 }
468
469 reactor->set_wait_exit(true);
470 reactor->set_end_callback(Reactor::PRIORITY_TRY_EXIT, Worker_reactor_try_to_exit);
471 SwooleWG.exit_time = ::time(nullptr);
472
473 Worker_reactor_try_to_exit(reactor);
474 if (!reactor->running) {
475 running = false;
476 }
477 }
478
Worker_reactor_try_to_exit(Reactor * reactor)479 static void Worker_reactor_try_to_exit(Reactor *reactor) {
480 Server *serv;
481 if (SwooleG.process_type == SW_PROCESS_TASKWORKER) {
482 ProcessPool *pool = (ProcessPool *) reactor->ptr;
483 serv = (Server *) pool->ptr;
484 } else {
485 serv = (Server *) reactor->ptr;
486 }
487 uint8_t call_worker_exit_func = 0;
488
489 while (1) {
490 if (reactor->if_exit()) {
491 reactor->running = false;
492 break;
493 } else {
494 if (serv->onWorkerExit && call_worker_exit_func == 0) {
495 serv->onWorkerExit(serv, SwooleG.process_id);
496 call_worker_exit_func = 1;
497 continue;
498 }
499 int remaining_time = serv->max_wait_time - (::time(nullptr) - SwooleWG.exit_time);
500 if (remaining_time <= 0) {
501 swoole_error_log(
502 SW_LOG_WARNING, SW_ERROR_SERVER_WORKER_EXIT_TIMEOUT, "worker exit timeout, forced termination");
503 reactor->running = false;
504 break;
505 } else {
506 int timeout_msec = remaining_time * 1000;
507 if (reactor->timeout_msec < 0 || reactor->timeout_msec > timeout_msec) {
508 reactor->timeout_msec = timeout_msec;
509 }
510 }
511 }
512 break;
513 }
514 }
515
drain_worker_pipe()516 void Server::drain_worker_pipe() {
517 for (uint32_t i = 0; i < worker_num + task_worker_num; i++) {
518 Worker *worker = get_worker(i);
519 if (sw_reactor()) {
520 if (worker->pipe_worker) {
521 sw_reactor()->drain_write_buffer(worker->pipe_worker);
522 }
523 if (worker->pipe_master) {
524 sw_reactor()->drain_write_buffer(worker->pipe_master);
525 }
526 }
527 }
528 }
529
530 /**
531 * main loop [Worker]
532 */
start_event_worker(Worker * worker)533 int Server::start_event_worker(Worker *worker) {
534 // worker_id
535 SwooleG.process_id = worker->id;
536
537 init_worker(worker);
538
539 if (swoole_event_init(0) < 0) {
540 return SW_ERR;
541 }
542
543 Reactor *reactor = SwooleTG.reactor;
544 /**
545 * set pipe buffer size
546 */
547 for (uint32_t i = 0; i < worker_num + task_worker_num; i++) {
548 Worker *_worker = get_worker(i);
549 if (_worker->pipe_master) {
550 _worker->pipe_master->buffer_size = UINT_MAX;
551 }
552 if (_worker->pipe_worker) {
553 _worker->pipe_worker->buffer_size = UINT_MAX;
554 }
555 }
556
557 worker->pipe_worker->set_nonblock();
558 reactor->ptr = this;
559 reactor->add(worker->pipe_worker, SW_EVENT_READ);
560 reactor->set_handler(SW_FD_PIPE, Worker_onPipeReceive);
561
562 if (dispatch_mode == DISPATCH_STREAM) {
563 reactor->add(stream_socket, SW_EVENT_READ);
564 reactor->set_handler(SW_FD_STREAM_SERVER, Worker_onStreamAccept);
565 reactor->set_handler(SW_FD_STREAM, Worker_onStreamRead);
566 network::Stream::set_protocol(&stream_protocol);
567 stream_protocol.private_data_2 = this;
568 stream_protocol.package_max_length = UINT_MAX;
569 stream_protocol.onPackage = Worker_onStreamPackage;
570 buffer_pool = new std::queue<String *>;
571 } else if (dispatch_mode == DISPATCH_CO_CONN_LB || dispatch_mode == DISPATCH_CO_REQ_LB) {
572 reactor->set_end_callback(Reactor::PRIORITY_WORKER_CALLBACK,
573 [worker](Reactor *) { worker->coroutine_num = Coroutine::count(); });
574 }
575
576 worker->status = SW_WORKER_IDLE;
577 worker_start_callback();
578
579 // main loop
580 reactor->wait(nullptr);
581 // drain pipe buffer
582 drain_worker_pipe();
583 // reactor free
584 swoole_event_free();
585 // worker shutdown
586 worker_stop_callback();
587
588 if (buffer_pool) {
589 delete buffer_pool;
590 }
591
592 return SW_OK;
593 }
594
595 /**
596 * [Worker/TaskWorker/Master] Send data to ReactorThread
597 */
send_to_reactor_thread(const EventData * ev_data,size_t sendn,SessionId session_id)598 ssize_t Server::send_to_reactor_thread(const EventData *ev_data, size_t sendn, SessionId session_id) {
599 Socket *pipe_sock = get_reactor_pipe_socket(session_id, ev_data->info.reactor_id);
600 if (swoole_event_is_available()) {
601 return swoole_event_write(pipe_sock, ev_data, sendn);
602 } else {
603 return pipe_sock->send_blocking(ev_data, sendn);
604 }
605 }
606
607 /**
608 * send message from worker to another worker
609 */
send_to_worker_from_worker(Worker * dst_worker,const void * buf,size_t len,int flags)610 ssize_t Server::send_to_worker_from_worker(Worker *dst_worker, const void *buf, size_t len, int flags) {
611 return dst_worker->send_pipe_message(buf, len, flags);
612 }
613
614 /**
615 * receive data from reactor
616 */
Worker_onPipeReceive(Reactor * reactor,Event * event)617 static int Worker_onPipeReceive(Reactor *reactor, Event *event) {
618 Server *serv = (Server *) reactor->ptr;
619 PipeBuffer *pipe_buffer = serv->message_bus.get_buffer();
620
621 if (serv->message_bus.read(event->socket) <= 0) {
622 return SW_OK;
623 }
624
625 serv->worker_accept_event(&pipe_buffer->info);
626 serv->message_bus.pop();
627
628 return SW_OK;
629 }
630
send_pipe_message(const void * buf,size_t n,int flags)631 ssize_t Worker::send_pipe_message(const void *buf, size_t n, int flags) {
632 Socket *pipe_sock;
633
634 if (flags & SW_PIPE_MASTER) {
635 pipe_sock = pipe_master;
636 } else {
637 pipe_sock = pipe_worker;
638 }
639
640 // message-queue
641 if (pool->use_msgqueue) {
642 struct {
643 long mtype;
644 EventData buf;
645 } msg;
646
647 msg.mtype = id + 1;
648 memcpy(&msg.buf, buf, n);
649
650 return pool->queue->push((QueueNode *) &msg, n) ? n : -1;
651 }
652
653 if ((flags & SW_PIPE_NONBLOCK) && swoole_event_is_available()) {
654 return swoole_event_write(pipe_sock, buf, n);
655 } else {
656 return pipe_sock->send_blocking(buf, n);
657 }
658 }
659 } // namespace swoole
660