1 /*
2   +----------------------------------------------------------------------+
3   | Swoole                                                               |
4   +----------------------------------------------------------------------+
5   | This source file is subject to version 2.0 of the Apache license,    |
6   | that is bundled with this package in the file LICENSE, and is        |
7   | available through the world-wide-web at the following url:           |
8   | http://www.apache.org/licenses/LICENSE-2.0.html                      |
9   | If you did not receive a copy of the Apache2.0 license and are unable|
10   | to obtain it through the world-wide-web, please send a note to       |
11   | license@swoole.com so we can mail you a copy immediately.            |
12   +----------------------------------------------------------------------+
13   | Author: Tianfeng Han  <mikan.tenny@gmail.com>                        |
14   +----------------------------------------------------------------------+
15 */
16 
17 #include <pwd.h>
18 #include <grp.h>
19 #include <sys/uio.h>
20 #include <sys/mman.h>
21 
22 #include "swoole_server.h"
23 #include "swoole_memory.h"
24 #include "swoole_msg_queue.h"
25 #include "swoole_client.h"
26 #include "swoole_coroutine.h"
27 
28 swoole::WorkerGlobal SwooleWG = {};
29 
30 namespace swoole {
31 using namespace network;
32 
33 static int Worker_onPipeReceive(Reactor *reactor, Event *event);
34 static int Worker_onStreamAccept(Reactor *reactor, Event *event);
35 static int Worker_onStreamRead(Reactor *reactor, Event *event);
36 static int Worker_onStreamPackage(const Protocol *proto, Socket *sock, const RecvData *rdata);
37 static int Worker_onStreamClose(Reactor *reactor, Event *event);
38 static void Worker_reactor_try_to_exit(Reactor *reactor);
39 
worker_signal_init(void)40 void Server::worker_signal_init(void) {
41     /**
42      * use user settings
43      */
44     SwooleG.use_signalfd = SwooleG.enable_signalfd;
45 
46     swoole_signal_set(SIGHUP, nullptr);
47     swoole_signal_set(SIGPIPE, SIG_IGN);
48     swoole_signal_set(SIGUSR1, nullptr);
49     swoole_signal_set(SIGUSR2, nullptr);
50     // swSignal_set(SIGINT, Server::worker_signal_handler);
51     swoole_signal_set(SIGTERM, Server::worker_signal_handler);
52     // for test
53     swoole_signal_set(SIGVTALRM, Server::worker_signal_handler);
54 #ifdef SIGRTMIN
55     swoole_signal_set(SIGRTMIN, Server::worker_signal_handler);
56 #endif
57 }
58 
worker_signal_handler(int signo)59 void Server::worker_signal_handler(int signo) {
60     if (!SwooleG.running or !sw_server()) {
61         return;
62     }
63     switch (signo) {
64     case SIGTERM:
65         // Event worker
66         if (swoole_event_is_available()) {
67             sw_server()->stop_async_worker(SwooleWG.worker);
68         }
69         // Task worker
70         else {
71             SwooleWG.shutdown = true;
72         }
73         break;
74     // for test
75     case SIGVTALRM:
76         swoole_warning("SIGVTALRM coming");
77         break;
78     case SIGUSR1:
79     case SIGUSR2:
80         if (sw_logger()) {
81             sw_logger()->reopen();
82         }
83         break;
84     default:
85 #ifdef SIGRTMIN
86         if (signo == SIGRTMIN && sw_logger()) {
87             sw_logger()->reopen();
88         }
89 #endif
90         break;
91     }
92 }
93 
Worker_discard_data(Server * serv,Connection * conn,DataHead * info)94 static sw_inline bool Worker_discard_data(Server *serv, Connection *conn, DataHead *info) {
95     if (conn == nullptr) {
96         if (serv->disable_notify && !serv->discard_timeout_request) {
97             return false;
98         }
99         goto _discard_data;
100     } else {
101         if (conn->closed) {
102             goto _discard_data;
103         } else {
104             return false;
105         }
106     }
107 _discard_data:
108     swoole_error_log(SW_LOG_WARNING,
109                      SW_ERROR_SESSION_DISCARD_TIMEOUT_DATA,
110                      "[2] ignore data[%u bytes] received from session#%ld",
111                      info->len,
112                      info->fd);
113     return true;
114 }
115 
Worker_onStreamAccept(Reactor * reactor,Event * event)116 static int Worker_onStreamAccept(Reactor *reactor, Event *event) {
117     Socket *sock = event->socket->accept();
118     if (sock == nullptr) {
119         switch (errno) {
120         case EINTR:
121         case EAGAIN:
122             return SW_OK;
123         default:
124             swoole_sys_warning("accept() failed");
125             return SW_OK;
126         }
127     }
128 
129     sock->fd_type = SW_FD_STREAM;
130     sock->socket_type = SW_SOCK_UNIX_STREAM;
131 
132     return reactor->add(sock, SW_EVENT_READ);
133 }
134 
Worker_onStreamRead(Reactor * reactor,Event * event)135 static int Worker_onStreamRead(Reactor *reactor, Event *event) {
136     Socket *conn = event->socket;
137     Server *serv = (Server *) reactor->ptr;
138     Protocol *protocol = &serv->stream_protocol;
139     String *buffer;
140 
141     if (!event->socket->recv_buffer) {
142         if (serv->buffer_pool->empty()) {
143             buffer = new String(SW_BUFFER_SIZE_STD);
144         } else {
145             buffer = serv->buffer_pool->front();
146             serv->buffer_pool->pop();
147         }
148         event->socket->recv_buffer = buffer;
149     } else {
150         buffer = event->socket->recv_buffer;
151     }
152 
153     if (protocol->recv_with_length_protocol(conn, buffer) < 0) {
154         Worker_onStreamClose(reactor, event);
155     }
156 
157     return SW_OK;
158 }
159 
Worker_onStreamClose(Reactor * reactor,Event * event)160 static int Worker_onStreamClose(Reactor *reactor, Event *event) {
161     Socket *sock = event->socket;
162     Server *serv = (Server *) reactor->ptr;
163 
164     sock->recv_buffer->clear();
165     serv->buffer_pool->push(sock->recv_buffer);
166     sock->recv_buffer = nullptr;
167 
168     reactor->del(sock);
169     reactor->close(reactor, sock);
170 
171     if (serv->last_stream_socket == sock) {
172         serv->last_stream_socket = nullptr;
173     }
174 
175     return SW_OK;
176 }
177 
Worker_onStreamPackage(const Protocol * proto,Socket * sock,const RecvData * rdata)178 static int Worker_onStreamPackage(const Protocol *proto, Socket *sock, const RecvData *rdata) {
179     Server *serv = (Server *) proto->private_data_2;
180 
181     SendData task{};
182     memcpy(&task.info, rdata->data + proto->package_length_size, sizeof(task.info));
183     task.info.len = rdata->info.len - (uint32_t) sizeof(task.info) - proto->package_length_size;
184     if (task.info.len > 0) {
185         task.data = (char *) (rdata->data + proto->package_length_size + sizeof(task.info));
186     }
187 
188     serv->last_stream_socket = sock;
189     serv->message_bus.pass(&task);
190     serv->worker_accept_event(&serv->message_bus.get_buffer()->info);
191     serv->last_stream_socket = nullptr;
192 
193     int _end = 0;
194     swoole_event_write(sock, (void *) &_end, sizeof(_end));
195 
196     return SW_OK;
197 }
198 
199 typedef std::function<int(Server *, RecvData *)> TaskCallback;
200 
Worker_do_task(Server * serv,Worker * worker,DataHead * info,const TaskCallback & callback)201 static sw_inline void Worker_do_task(Server *serv, Worker *worker, DataHead *info, const TaskCallback &callback) {
202     RecvData recv_data;
203     auto packet = serv->message_bus.get_packet();
204     recv_data.info = *info;
205     recv_data.info.len = packet.length;
206     recv_data.data = packet.data;
207 
208     if (callback(serv, &recv_data) == SW_OK) {
209         worker->request_count++;
210         sw_atomic_fetch_add(&serv->gs->request_count, 1);
211     }
212 }
213 
worker_accept_event(DataHead * info)214 void Server::worker_accept_event(DataHead *info) {
215     Worker *worker = SwooleWG.worker;
216     // worker busy
217     worker->status = SW_WORKER_BUSY;
218 
219     switch (info->type) {
220     case SW_SERVER_EVENT_RECV_DATA: {
221         Connection *conn = get_connection_verify(info->fd);
222         if (conn) {
223             if (info->len > 0) {
224                 auto packet = message_bus.get_packet();
225                 sw_atomic_fetch_sub(&conn->recv_queued_bytes, packet.length);
226                 swoole_trace_log(SW_TRACE_SERVER,
227                                  "[Worker] session_id=%ld, len=%lu, qb=%d",
228                                  conn->session_id,
229                                  packet.length,
230                                  conn->recv_queued_bytes);
231             }
232             conn->last_dispatch_time = info->time;
233         }
234         if (!Worker_discard_data(this, conn, info)) {
235             Worker_do_task(this, worker, info, onReceive);
236         }
237         break;
238     }
239     case SW_SERVER_EVENT_RECV_DGRAM: {
240         Worker_do_task(this, worker, info, onPacket);
241         break;
242     }
243     case SW_SERVER_EVENT_CLOSE: {
244 #ifdef SW_USE_OPENSSL
245         Connection *conn = get_connection_verify_no_ssl(info->fd);
246         if (conn && conn->ssl_client_cert && conn->ssl_client_cert_pid == SwooleG.pid) {
247             delete conn->ssl_client_cert;
248             conn->ssl_client_cert = nullptr;
249         }
250 #endif
251         factory->end(info->fd, false);
252         break;
253     }
254     case SW_SERVER_EVENT_CONNECT: {
255 #ifdef SW_USE_OPENSSL
256         // SSL client certificate
257         if (info->len > 0) {
258             Connection *conn = get_connection_verify_no_ssl(info->fd);
259             if (conn) {
260                 auto packet = message_bus.get_packet();
261                 conn->ssl_client_cert = new String(packet.data, packet.length);
262                 conn->ssl_client_cert_pid = SwooleG.pid;
263             }
264         }
265 #endif
266         if (onConnect) {
267             onConnect(this, info);
268         }
269         break;
270     }
271 
272     case SW_SERVER_EVENT_BUFFER_FULL: {
273         if (onBufferFull) {
274             onBufferFull(this, info);
275         }
276         break;
277     }
278     case SW_SERVER_EVENT_BUFFER_EMPTY: {
279         if (onBufferEmpty) {
280             onBufferEmpty(this, info);
281         }
282         break;
283     }
284     case SW_SERVER_EVENT_FINISH: {
285         onFinish(this, (EventData *) message_bus.get_buffer());
286         break;
287     }
288     case SW_SERVER_EVENT_PIPE_MESSAGE: {
289         onPipeMessage(this, (EventData *) message_bus.get_buffer());
290         break;
291     }
292     case SW_SERVER_EVENT_COMMAND_REQUEST: {
293         call_command_handler(message_bus, worker->id, pipe_command->get_socket(false));
294         break;
295     }
296     default:
297         swoole_warning("[Worker] error event[type=%d]", (int) info->type);
298         break;
299     }
300 
301     // worker idle
302     worker->status = SW_WORKER_IDLE;
303 
304     // maximum number of requests, process will exit.
305     if (!SwooleWG.run_always && worker->request_count >= SwooleWG.max_request) {
306         stop_async_worker(worker);
307     }
308 }
309 
worker_start_callback()310 void Server::worker_start_callback() {
311     if (SwooleG.process_id >= worker_num) {
312         SwooleG.process_type = SW_PROCESS_TASKWORKER;
313     } else {
314         SwooleG.process_type = SW_PROCESS_WORKER;
315     }
316 
317     int is_root = !geteuid();
318     struct passwd *_passwd = nullptr;
319     struct group *_group = nullptr;
320 
321     if (is_root) {
322         // get group info
323         if (!group_.empty()) {
324             _group = getgrnam(group_.c_str());
325             if (!_group) {
326                 swoole_warning("get group [%s] info failed", group_.c_str());
327             }
328         }
329         // get user info
330         if (!user_.empty()) {
331             _passwd = getpwnam(user_.c_str());
332             if (!_passwd) {
333                 swoole_warning("get user [%s] info failed", user_.c_str());
334             }
335         }
336         // set process group
337         if (_group && setgid(_group->gr_gid) < 0) {
338             swoole_sys_warning("setgid to [%s] failed", group_.c_str());
339         }
340         // set process user
341         if (_passwd && setuid(_passwd->pw_uid) < 0) {
342             swoole_sys_warning("setuid to [%s] failed", user_.c_str());
343         }
344         // chroot
345         if (!chroot_.empty()) {
346             if (::chroot(chroot_.c_str()) == 0) {
347                 if (chdir("/") < 0) {
348                     swoole_sys_warning("chdir(\"/\") failed");
349                 }
350             } else {
351                 swoole_sys_warning("chroot(\"%s\") failed", chroot_.c_str());
352             }
353         }
354     }
355 
356     SW_LOOP_N(worker_num + task_worker_num) {
357         Worker *worker = get_worker(i);
358         if (SwooleG.process_id == i) {
359             continue;
360         }
361         if (is_worker() && worker->pipe_master) {
362             worker->pipe_master->set_nonblock();
363         }
364     }
365 
366     if (sw_logger()->is_opened()) {
367         sw_logger()->reopen();
368     }
369 
370     SwooleWG.worker = get_worker(SwooleG.process_id);
371     SwooleWG.worker->status = SW_WORKER_IDLE;
372 
373 #ifdef HAVE_SIGNALFD
374     if (SwooleG.use_signalfd && SwooleTG.reactor && SwooleG.signal_fd == 0) {
375         swoole_signalfd_setup(SwooleTG.reactor);
376     }
377 #endif
378 
379     if (is_process_mode()) {
380         sw_shm_protect(session_list, PROT_READ);
381     }
382 
383     call_worker_start_callback(SwooleWG.worker);
384 }
385 
worker_stop_callback()386 void Server::worker_stop_callback() {
387     void *hook_args[2];
388     hook_args[0] = this;
389     hook_args[1] = (void *) (uintptr_t) SwooleG.process_id;
390     if (swoole_isset_hook(SW_GLOBAL_HOOK_BEFORE_WORKER_STOP)) {
391         swoole_call_hook(SW_GLOBAL_HOOK_BEFORE_WORKER_STOP, hook_args);
392     }
393     if (onWorkerStop) {
394         onWorkerStop(this, SwooleG.process_id);
395     }
396     if (!message_bus.empty()) {
397         swoole_error_log(
398             SW_LOG_WARNING, SW_ERROR_SERVER_WORKER_UNPROCESSED_DATA, "unprocessed data in the worker process buffer");
399         message_bus.clear();
400     }
401 }
402 
stop_async_worker(Worker * worker)403 void Server::stop_async_worker(Worker *worker) {
404     worker->status = SW_WORKER_EXIT;
405     Reactor *reactor = SwooleTG.reactor;
406 
407     /**
408      * force to end.
409      */
410     if (reload_async == 0) {
411         running = false;
412         reactor->running = false;
413         return;
414     }
415 
416     // The worker process is shutting down now.
417     if (reactor->wait_exit) {
418         return;
419     }
420 
421     // Separated from the event worker process pool
422     worker = (Worker *) sw_malloc(sizeof(*worker));
423     *worker = *SwooleWG.worker;
424     SwooleWG.worker = worker;
425 
426     if (stream_socket) {
427         reactor->del(stream_socket);
428         stream_socket->free();
429         stream_socket = nullptr;
430     }
431 
432     if (worker->pipe_worker && !worker->pipe_worker->removed) {
433         reactor->remove_read_event(worker->pipe_worker);
434     }
435 
436     if (is_base_mode()) {
437         if (is_worker()) {
438             if (worker->id == 0 && gs->event_workers.running == 0) {
439                 if (swoole_isset_hook(SW_GLOBAL_HOOK_BEFORE_SERVER_SHUTDOWN)) {
440                     swoole_call_hook(SW_GLOBAL_HOOK_BEFORE_SERVER_SHUTDOWN, this);
441                 }
442                 if (onBeforeShutdown) {
443                     onBeforeShutdown(this);
444                 }
445             }
446             for (auto ls : ports) {
447                 reactor->del(ls->socket);
448             }
449             if (worker->pipe_master && !worker->pipe_master->removed) {
450                 reactor->remove_read_event(worker->pipe_master);
451             }
452             foreach_connection([reactor](Connection *conn) {
453                 if (!conn->peer_closed && !conn->socket->removed) {
454                     reactor->remove_read_event(conn->socket);
455                 }
456             });
457             clear_timer();
458         }
459     } else {
460         WorkerStopMessage msg;
461         msg.pid = SwooleG.pid;
462         msg.worker_id = SwooleG.process_id;
463 
464         if (gs->event_workers.push_message(SW_WORKER_MESSAGE_STOP, &msg, sizeof(msg)) < 0) {
465             running = 0;
466         }
467     }
468 
469     reactor->set_wait_exit(true);
470     reactor->set_end_callback(Reactor::PRIORITY_TRY_EXIT, Worker_reactor_try_to_exit);
471     SwooleWG.exit_time = ::time(nullptr);
472 
473     Worker_reactor_try_to_exit(reactor);
474     if (!reactor->running) {
475         running = false;
476     }
477 }
478 
Worker_reactor_try_to_exit(Reactor * reactor)479 static void Worker_reactor_try_to_exit(Reactor *reactor) {
480     Server *serv;
481     if (SwooleG.process_type == SW_PROCESS_TASKWORKER) {
482         ProcessPool *pool = (ProcessPool *) reactor->ptr;
483         serv = (Server *) pool->ptr;
484     } else {
485         serv = (Server *) reactor->ptr;
486     }
487     uint8_t call_worker_exit_func = 0;
488 
489     while (1) {
490         if (reactor->if_exit()) {
491             reactor->running = false;
492             break;
493         } else {
494             if (serv->onWorkerExit && call_worker_exit_func == 0) {
495                 serv->onWorkerExit(serv, SwooleG.process_id);
496                 call_worker_exit_func = 1;
497                 continue;
498             }
499             int remaining_time = serv->max_wait_time - (::time(nullptr) - SwooleWG.exit_time);
500             if (remaining_time <= 0) {
501                 swoole_error_log(
502                     SW_LOG_WARNING, SW_ERROR_SERVER_WORKER_EXIT_TIMEOUT, "worker exit timeout, forced termination");
503                 reactor->running = false;
504                 break;
505             } else {
506                 int timeout_msec = remaining_time * 1000;
507                 if (reactor->timeout_msec < 0 || reactor->timeout_msec > timeout_msec) {
508                     reactor->timeout_msec = timeout_msec;
509                 }
510             }
511         }
512         break;
513     }
514 }
515 
drain_worker_pipe()516 void Server::drain_worker_pipe() {
517     for (uint32_t i = 0; i < worker_num + task_worker_num; i++) {
518         Worker *worker = get_worker(i);
519         if (sw_reactor()) {
520             if (worker->pipe_worker) {
521                 sw_reactor()->drain_write_buffer(worker->pipe_worker);
522             }
523             if (worker->pipe_master) {
524                 sw_reactor()->drain_write_buffer(worker->pipe_master);
525             }
526         }
527     }
528 }
529 
530 /**
531  * main loop [Worker]
532  */
start_event_worker(Worker * worker)533 int Server::start_event_worker(Worker *worker) {
534     // worker_id
535     SwooleG.process_id = worker->id;
536 
537     init_worker(worker);
538 
539     if (swoole_event_init(0) < 0) {
540         return SW_ERR;
541     }
542 
543     Reactor *reactor = SwooleTG.reactor;
544     /**
545      * set pipe buffer size
546      */
547     for (uint32_t i = 0; i < worker_num + task_worker_num; i++) {
548         Worker *_worker = get_worker(i);
549         if (_worker->pipe_master) {
550             _worker->pipe_master->buffer_size = UINT_MAX;
551         }
552         if (_worker->pipe_worker) {
553             _worker->pipe_worker->buffer_size = UINT_MAX;
554         }
555     }
556 
557     worker->pipe_worker->set_nonblock();
558     reactor->ptr = this;
559     reactor->add(worker->pipe_worker, SW_EVENT_READ);
560     reactor->set_handler(SW_FD_PIPE, Worker_onPipeReceive);
561 
562     if (dispatch_mode == DISPATCH_STREAM) {
563         reactor->add(stream_socket, SW_EVENT_READ);
564         reactor->set_handler(SW_FD_STREAM_SERVER, Worker_onStreamAccept);
565         reactor->set_handler(SW_FD_STREAM, Worker_onStreamRead);
566         network::Stream::set_protocol(&stream_protocol);
567         stream_protocol.private_data_2 = this;
568         stream_protocol.package_max_length = UINT_MAX;
569         stream_protocol.onPackage = Worker_onStreamPackage;
570         buffer_pool = new std::queue<String *>;
571     } else if (dispatch_mode == DISPATCH_CO_CONN_LB || dispatch_mode == DISPATCH_CO_REQ_LB) {
572         reactor->set_end_callback(Reactor::PRIORITY_WORKER_CALLBACK,
573                                   [worker](Reactor *) { worker->coroutine_num = Coroutine::count(); });
574     }
575 
576     worker->status = SW_WORKER_IDLE;
577     worker_start_callback();
578 
579     // main loop
580     reactor->wait(nullptr);
581     // drain pipe buffer
582     drain_worker_pipe();
583     // reactor free
584     swoole_event_free();
585     // worker shutdown
586     worker_stop_callback();
587 
588     if (buffer_pool) {
589         delete buffer_pool;
590     }
591 
592     return SW_OK;
593 }
594 
595 /**
596  * [Worker/TaskWorker/Master] Send data to ReactorThread
597  */
send_to_reactor_thread(const EventData * ev_data,size_t sendn,SessionId session_id)598 ssize_t Server::send_to_reactor_thread(const EventData *ev_data, size_t sendn, SessionId session_id) {
599     Socket *pipe_sock = get_reactor_pipe_socket(session_id, ev_data->info.reactor_id);
600     if (swoole_event_is_available()) {
601         return swoole_event_write(pipe_sock, ev_data, sendn);
602     } else {
603         return pipe_sock->send_blocking(ev_data, sendn);
604     }
605 }
606 
607 /**
608  * send message from worker to another worker
609  */
send_to_worker_from_worker(Worker * dst_worker,const void * buf,size_t len,int flags)610 ssize_t Server::send_to_worker_from_worker(Worker *dst_worker, const void *buf, size_t len, int flags) {
611     return dst_worker->send_pipe_message(buf, len, flags);
612 }
613 
614 /**
615  * receive data from reactor
616  */
Worker_onPipeReceive(Reactor * reactor,Event * event)617 static int Worker_onPipeReceive(Reactor *reactor, Event *event) {
618     Server *serv = (Server *) reactor->ptr;
619     PipeBuffer *pipe_buffer = serv->message_bus.get_buffer();
620 
621     if (serv->message_bus.read(event->socket) <= 0) {
622         return SW_OK;
623     }
624 
625     serv->worker_accept_event(&pipe_buffer->info);
626     serv->message_bus.pop();
627 
628     return SW_OK;
629 }
630 
send_pipe_message(const void * buf,size_t n,int flags)631 ssize_t Worker::send_pipe_message(const void *buf, size_t n, int flags) {
632     Socket *pipe_sock;
633 
634     if (flags & SW_PIPE_MASTER) {
635         pipe_sock = pipe_master;
636     } else {
637         pipe_sock = pipe_worker;
638     }
639 
640     // message-queue
641     if (pool->use_msgqueue) {
642         struct {
643             long mtype;
644             EventData buf;
645         } msg;
646 
647         msg.mtype = id + 1;
648         memcpy(&msg.buf, buf, n);
649 
650         return pool->queue->push((QueueNode *) &msg, n) ? n : -1;
651     }
652 
653     if ((flags & SW_PIPE_NONBLOCK) && swoole_event_is_available()) {
654         return swoole_event_write(pipe_sock, buf, n);
655     } else {
656         return pipe_sock->send_blocking(buf, n);
657     }
658 }
659 }  // namespace swoole
660