1 /*
2   +----------------------------------------------------------------------+
3   | Swoole                                                               |
4   +----------------------------------------------------------------------+
5   | This source file is subject to version 2.0 of the Apache license,    |
6   | that is bundled with this package in the file LICENSE, and is        |
7   | available through the world-wide-web at the following url:           |
8   | http://www.apache.org/licenses/LICENSE-2.0.html                      |
9   | If you did not receive a copy of the Apache2.0 license and are unable|
10   | to obtain it through the world-wide-web, please send a note to       |
11   | license@swoole.com so we can mail you a copy immediately.            |
12   +----------------------------------------------------------------------+
13   | Author: Tianfeng Han  <mikan.tenny@gmail.com>                        |
14   +----------------------------------------------------------------------+
15 */
16 
17 #include "swoole.h"
18 #include "swoole_socket.h"
19 #include "swoole_signal.h"
20 #include "swoole_reactor.h"
21 #include "swoole_api.h"
22 #include "swoole_c_api.h"
23 
24 namespace swoole {
25 using network::Socket;
26 
27 #ifdef SW_USE_MALLOC_TRIM
28 #ifdef __APPLE__
29 #include <sys/malloc.h>
30 #else
31 #include <malloc.h>
32 #endif
33 #endif
34 
35 static void reactor_begin(Reactor *reactor);
36 
37 #ifdef HAVE_EPOLL
38 ReactorImpl *make_reactor_epoll(Reactor *_reactor, int max_events);
39 #endif
40 
41 #ifdef HAVE_POLL
42 ReactorImpl *make_reactor_poll(Reactor *_reactor, int max_events);
43 #endif
44 
45 #ifdef HAVE_KQUEUE
46 ReactorImpl *make_reactor_kqueue(Reactor *_reactor, int max_events);
47 #endif
48 
49 ReactorImpl *make_reactor_select(Reactor *_reactor);
50 
after_removal_failure(network::Socket * _socket)51 void ReactorImpl::after_removal_failure(network::Socket *_socket) {
52     if (!_socket->silent_remove) {
53         swoole_sys_warning("failed to delete events[fd=%d#%d, type=%d, events=%d]",
54                            _socket->fd,
55                            reactor_->id,
56                            _socket->fd_type,
57                            _socket->events);
58     }
59 }
60 
Reactor(int max_event,Type _type)61 Reactor::Reactor(int max_event, Type _type) {
62     if (_type == TYPE_AUTO) {
63 #ifdef HAVE_EPOLL
64         type_ = TYPE_EPOLL;
65 #elif defined(HAVE_KQUEUE)
66         type_ = TYPE_KQUEUE;
67 #elif defined(HAVE_POLL)
68         type_ = TYPE_POLL;
69 #else
70         type_ = TYPE_SELECT;
71 #endif
72     } else {
73         type_ = _type;
74     }
75 
76     switch (type_) {
77 #ifdef HAVE_EPOLL
78     case TYPE_EPOLL:
79         impl = make_reactor_epoll(this, max_event);
80         break;
81 #endif
82 #ifdef HAVE_KQUEUE
83     case TYPE_KQUEUE:
84         impl = make_reactor_kqueue(this, max_event);
85         break;
86 #endif
87 #ifdef HAVE_POLL
88     case TYPE_POLL:
89         impl = make_reactor_poll(this, max_event);
90         break;
91 #endif
92     case TYPE_SELECT:
93     default:
94         impl = make_reactor_select(this);
95         break;
96     }
97 
98     if (!impl->ready()) {
99         running = false;
100         return;
101     }
102 
103     running = true;
104     idle_task = {};
105     future_task = {};
106 
107     write = _write;
108     writev = _writev;
109     close = _close;
110 
111     default_write_handler = _writable_callback;
112 
113     if (swoole_isset_hook(SW_GLOBAL_HOOK_ON_REACTOR_CREATE)) {
114         swoole_call_hook(SW_GLOBAL_HOOK_ON_REACTOR_CREATE, this);
115     }
116 
117     set_end_callback(PRIORITY_DEFER_TASK, [](Reactor *reactor) {
118         CallbackManager *cm = reactor->defer_tasks;
119         if (cm) {
120             reactor->defer_tasks = nullptr;
121             cm->execute();
122             delete cm;
123         }
124     });
125 
126     set_exit_condition(EXIT_CONDITION_DEFER_TASK,
127                        [](Reactor *reactor, size_t &event_num) -> bool { return reactor->defer_tasks == nullptr; });
128 
129     set_end_callback(PRIORITY_IDLE_TASK, [](Reactor *reactor) {
130         if (reactor->idle_task.callback) {
131             reactor->idle_task.callback(reactor->idle_task.data);
132         }
133     });
134 
135     set_end_callback(PRIORITY_SIGNAL_CALLBACK, [](Reactor *reactor) {
136         if (sw_unlikely(reactor->singal_no)) {
137             swoole_signal_callback(reactor->singal_no);
138             reactor->singal_no = 0;
139         }
140     });
141 
142     set_end_callback(PRIORITY_TRY_EXIT, [](Reactor *reactor) {
143         if (reactor->wait_exit && reactor->if_exit()) {
144             reactor->running = false;
145         }
146     });
147 
148 #ifdef SW_USE_MALLOC_TRIM
149     set_end_callback(PRIORITY_MALLOC_TRIM, [](Reactor *reactor) {
150         time_t now = ::time(nullptr);
151         if (reactor->last_malloc_trim_time < now - SW_MALLOC_TRIM_INTERVAL) {
152             malloc_trim(SW_MALLOC_TRIM_PAD);
153             reactor->last_malloc_trim_time = now;
154         }
155     });
156 #endif
157 
158     set_exit_condition(EXIT_CONDITION_DEFAULT,
159                        [](Reactor *reactor, size_t &event_num) -> bool { return event_num == 0; });
160 }
161 
set_handler(int _fdtype,ReactorHandler handler)162 bool Reactor::set_handler(int _fdtype, ReactorHandler handler) {
163     int fdtype = get_fd_type(_fdtype);
164 
165     if (fdtype >= SW_MAX_FDTYPE) {
166         swoole_warning("fdtype > SW_MAX_FDTYPE[%d]", SW_MAX_FDTYPE);
167         return false;
168     }
169 
170     if (isset_read_event(_fdtype)) {
171         read_handler[fdtype] = handler;
172     } else if (isset_write_event(_fdtype)) {
173         write_handler[fdtype] = handler;
174     } else if (isset_error_event(_fdtype)) {
175         error_handler[fdtype] = handler;
176     } else {
177         swoole_warning("unknown fdtype");
178         return false;
179     }
180 
181     return true;
182 }
183 
if_exit()184 bool Reactor::if_exit() {
185     size_t _event_num = get_event_num();
186     for (auto &kv : exit_conditions) {
187         if (kv.second(this, _event_num) == false) {
188             return false;
189         }
190     }
191     return true;
192 }
193 
activate_future_task()194 void Reactor::activate_future_task() {
195     onBegin = reactor_begin;
196 }
197 
reactor_begin(Reactor * reactor)198 static void reactor_begin(Reactor *reactor) {
199     if (reactor->future_task.callback) {
200         reactor->future_task.callback(reactor->future_task.data);
201     }
202 }
203 
_close(Reactor * reactor,Socket * socket)204 int Reactor::_close(Reactor *reactor, Socket *socket) {
205     swoole_trace_log(SW_TRACE_CLOSE, "fd=%d", socket->fd);
206     socket->free();
207     return SW_OK;
208 }
209 
210 using SendFunc = std::function<ssize_t(void)>;
211 using AppendFunc = std::function<void(Buffer *buffer)>;
212 
write_func(Reactor * reactor,Socket * socket,const size_t __len,const SendFunc & send_fn,const AppendFunc & append_fn)213 static ssize_t write_func(
214     Reactor *reactor, Socket *socket, const size_t __len, const SendFunc &send_fn, const AppendFunc &append_fn) {
215     ssize_t retval;
216     Buffer *buffer = socket->out_buffer;
217     int fd = socket->fd;
218 
219     if (socket->buffer_size == 0) {
220         socket->set_memory_buffer_size(Socket::default_buffer_size);
221     }
222 
223     if (socket->nonblock == 0) {
224         socket->set_fd_option(1, -1);
225     }
226 
227     if ((uint32_t) __len > socket->buffer_size) {
228         swoole_error_log(SW_LOG_WARNING,
229                          SW_ERROR_PACKAGE_LENGTH_TOO_LARGE,
230                          "data packet is too large, cannot exceed the buffer size");
231         return SW_ERR;
232     }
233 
234     if (Buffer::empty(buffer)) {
235 #ifdef SW_USE_OPENSSL
236         if (socket->ssl_send_) {
237             goto _alloc_buffer;
238         }
239 #endif
240     _do_send:
241         retval = send_fn();
242 
243         if (retval > 0) {
244             if ((ssize_t) __len == retval) {
245                 return retval;
246             } else {
247                 goto _alloc_buffer;
248             }
249         } else if (socket->catch_error(errno) == SW_WAIT) {
250         _alloc_buffer:
251             if (!socket->out_buffer) {
252                 buffer = new Buffer(socket->chunk_size);
253                 if (!buffer) {
254                     swoole_warning("create worker buffer failed");
255                     return SW_ERR;
256                 }
257                 socket->out_buffer = buffer;
258             }
259             if (!socket->isset_writable_event()) {
260                 reactor->add_write_event(socket);
261             }
262             goto _append_buffer;
263         } else if (errno == EINTR) {
264             goto _do_send;
265         } else {
266             swoole_set_last_error(errno);
267             return SW_ERR;
268         }
269     } else {
270     _append_buffer:
271         if (buffer->length() > socket->buffer_size) {
272             if (socket->dontwait) {
273                 swoole_set_last_error(SW_ERROR_OUTPUT_BUFFER_OVERFLOW);
274                 return SW_ERR;
275             } else {
276                 swoole_error_log(
277                     SW_LOG_WARNING, SW_ERROR_OUTPUT_BUFFER_OVERFLOW, "socket#%d output buffer overflow", fd);
278                 sw_yield();
279                 socket->wait_event(SW_SOCKET_OVERFLOW_WAIT, SW_EVENT_WRITE);
280             }
281         }
282         append_fn(buffer);
283     }
284     return __len;
285 }
286 
_write(Reactor * reactor,Socket * socket,const void * buf,size_t n)287 ssize_t Reactor::_write(Reactor *reactor, Socket *socket, const void *buf, size_t n) {
288     ssize_t send_bytes = 0;
289     auto send_fn = [&send_bytes, socket, buf, n]() -> ssize_t {
290         send_bytes = socket->send(buf, n, 0);
291         return send_bytes;
292     };
293     auto append_fn = [&send_bytes, buf, n](Buffer *buffer) {
294         ssize_t offset = send_bytes > 0 ? send_bytes : 0;
295         buffer->append((const char *) buf + offset, n - offset);
296     };
297     return write_func(reactor, socket, n, send_fn, append_fn);
298 }
299 
_writev(Reactor * reactor,network::Socket * socket,const iovec * iov,size_t iovcnt)300 ssize_t Reactor::_writev(Reactor *reactor, network::Socket *socket, const iovec *iov, size_t iovcnt) {
301 #ifdef SW_USE_OPENSSL
302     if (socket->ssl) {
303         swoole_error_log(SW_LOG_WARNING, SW_ERROR_OPERATION_NOT_SUPPORT, "does not support SSL");
304         return SW_ERR;
305     }
306 #endif
307 
308     ssize_t send_bytes = 0;
309     size_t n = 0;
310     SW_LOOP_N(iovcnt) {
311         n += iov[i].iov_len;
312     }
313     auto send_fn = [&send_bytes, socket, iov, iovcnt]() -> ssize_t {
314         send_bytes = socket->writev(iov, iovcnt);
315         return send_bytes;
316     };
317     auto append_fn = [&send_bytes, iov, iovcnt](Buffer *buffer) {
318         ssize_t offset = send_bytes > 0 ? send_bytes : 0;
319         buffer->append(iov, iovcnt, offset);
320     };
321     return write_func(reactor, socket, n, send_fn, append_fn);
322 }
323 
_writable_callback(Reactor * reactor,Event * ev)324 int Reactor::_writable_callback(Reactor *reactor, Event *ev) {
325     int ret;
326 
327     Socket *socket = ev->socket;
328     Buffer *buffer = socket->out_buffer;
329 
330     while (!Buffer::empty(buffer)) {
331         BufferChunk *chunk = buffer->front();
332         if (chunk->type == BufferChunk::TYPE_CLOSE) {
333             return reactor->close(reactor, ev->socket);
334         } else if (chunk->type == BufferChunk::TYPE_SENDFILE) {
335             ret = socket->handle_sendfile();
336         } else {
337             ret = socket->handle_send();
338         }
339 
340         if (ret < 0) {
341             if (socket->close_wait) {
342                 return reactor->trigger_close_event(ev);
343             } else if (socket->send_wait) {
344                 return SW_OK;
345             }
346         }
347     }
348 
349     if (socket->send_timer) {
350         swoole_timer_del(socket->send_timer);
351         socket->send_timer = nullptr;
352     }
353 
354     // remove EPOLLOUT event
355     if (Buffer::empty(buffer)) {
356         reactor->remove_write_event(ev->socket);
357     }
358 
359     return SW_OK;
360 }
361 
drain_write_buffer(swSocket * socket)362 void Reactor::drain_write_buffer(swSocket *socket) {
363     Event event = {};
364     event.socket = socket;
365     event.fd = socket->fd;
366 
367     while (!Buffer::empty(socket->out_buffer)) {
368         if (socket->wait_event(network::Socket::default_write_timeout, SW_EVENT_WRITE) == SW_ERR) {
369             break;
370         }
371         _writable_callback(this, &event);
372         if (socket->close_wait || socket->removed) {
373             break;
374         }
375     }
376 }
377 
add_destroy_callback(Callback cb,void * data)378 void Reactor::add_destroy_callback(Callback cb, void *data) {
379     destroy_callbacks.append(cb, data);
380 }
381 
set_end_callback(enum EndCallback id,const std::function<void (Reactor *)> & fn)382 void Reactor::set_end_callback(enum EndCallback id, const std::function<void(Reactor *)> &fn) {
383     end_callbacks[id] = fn;
384 }
385 
set_exit_condition(enum ExitCondition id,const std::function<bool (Reactor *,size_t &)> & fn)386 void Reactor::set_exit_condition(enum ExitCondition id, const std::function<bool(Reactor *, size_t &)> &fn) {
387     exit_conditions[id] = fn;
388 }
389 
defer(Callback cb,void * data)390 void Reactor::defer(Callback cb, void *data) {
391     if (defer_tasks == nullptr) {
392         defer_tasks = new CallbackManager;
393     }
394     defer_tasks->append(cb, data);
395 }
396 
execute_end_callbacks(bool timedout)397 void Reactor::execute_end_callbacks(bool timedout) {
398     for (auto &kv : end_callbacks) {
399         kv.second(this);
400     }
401 }
402 
~Reactor()403 Reactor::~Reactor() {
404     destroyed = true;
405     destroy_callbacks.execute();
406     delete impl;
407     if (swoole_isset_hook(SW_GLOBAL_HOOK_ON_REACTOR_DESTROY)) {
408         swoole_call_hook(SW_GLOBAL_HOOK_ON_REACTOR_DESTROY, this);
409     }
410 }
411 
412 }  // namespace swoole
413