1 /*
2 +----------------------------------------------------------------------+
3 | Swoole |
4 +----------------------------------------------------------------------+
5 | This source file is subject to version 2.0 of the Apache license, |
6 | that is bundled with this package in the file LICENSE, and is |
7 | available through the world-wide-web at the following url: |
8 | http://www.apache.org/licenses/LICENSE-2.0.html |
9 | If you did not receive a copy of the Apache2.0 license and are unable|
10 | to obtain it through the world-wide-web, please send a note to |
11 | license@swoole.com so we can mail you a copy immediately. |
12 +----------------------------------------------------------------------+
13 | Author: Tianfeng Han <mikan.tenny@gmail.com> |
14 +----------------------------------------------------------------------+
15 */
16
17 #include "swoole.h"
18 #include "swoole_socket.h"
19 #include "swoole_signal.h"
20 #include "swoole_reactor.h"
21 #include "swoole_api.h"
22 #include "swoole_c_api.h"
23
24 namespace swoole {
25 using network::Socket;
26
27 #ifdef SW_USE_MALLOC_TRIM
28 #ifdef __APPLE__
29 #include <sys/malloc.h>
30 #else
31 #include <malloc.h>
32 #endif
33 #endif
34
35 static void reactor_begin(Reactor *reactor);
36
37 #ifdef HAVE_EPOLL
38 ReactorImpl *make_reactor_epoll(Reactor *_reactor, int max_events);
39 #endif
40
41 #ifdef HAVE_POLL
42 ReactorImpl *make_reactor_poll(Reactor *_reactor, int max_events);
43 #endif
44
45 #ifdef HAVE_KQUEUE
46 ReactorImpl *make_reactor_kqueue(Reactor *_reactor, int max_events);
47 #endif
48
49 ReactorImpl *make_reactor_select(Reactor *_reactor);
50
after_removal_failure(network::Socket * _socket)51 void ReactorImpl::after_removal_failure(network::Socket *_socket) {
52 if (!_socket->silent_remove) {
53 swoole_sys_warning("failed to delete events[fd=%d#%d, type=%d, events=%d]",
54 _socket->fd,
55 reactor_->id,
56 _socket->fd_type,
57 _socket->events);
58 }
59 }
60
Reactor(int max_event,Type _type)61 Reactor::Reactor(int max_event, Type _type) {
62 if (_type == TYPE_AUTO) {
63 #ifdef HAVE_EPOLL
64 type_ = TYPE_EPOLL;
65 #elif defined(HAVE_KQUEUE)
66 type_ = TYPE_KQUEUE;
67 #elif defined(HAVE_POLL)
68 type_ = TYPE_POLL;
69 #else
70 type_ = TYPE_SELECT;
71 #endif
72 } else {
73 type_ = _type;
74 }
75
76 switch (type_) {
77 #ifdef HAVE_EPOLL
78 case TYPE_EPOLL:
79 impl = make_reactor_epoll(this, max_event);
80 break;
81 #endif
82 #ifdef HAVE_KQUEUE
83 case TYPE_KQUEUE:
84 impl = make_reactor_kqueue(this, max_event);
85 break;
86 #endif
87 #ifdef HAVE_POLL
88 case TYPE_POLL:
89 impl = make_reactor_poll(this, max_event);
90 break;
91 #endif
92 case TYPE_SELECT:
93 default:
94 impl = make_reactor_select(this);
95 break;
96 }
97
98 if (!impl->ready()) {
99 running = false;
100 return;
101 }
102
103 running = true;
104 idle_task = {};
105 future_task = {};
106
107 write = _write;
108 writev = _writev;
109 close = _close;
110
111 default_write_handler = _writable_callback;
112
113 if (swoole_isset_hook(SW_GLOBAL_HOOK_ON_REACTOR_CREATE)) {
114 swoole_call_hook(SW_GLOBAL_HOOK_ON_REACTOR_CREATE, this);
115 }
116
117 set_end_callback(PRIORITY_DEFER_TASK, [](Reactor *reactor) {
118 CallbackManager *cm = reactor->defer_tasks;
119 if (cm) {
120 reactor->defer_tasks = nullptr;
121 cm->execute();
122 delete cm;
123 }
124 });
125
126 set_exit_condition(EXIT_CONDITION_DEFER_TASK,
127 [](Reactor *reactor, size_t &event_num) -> bool { return reactor->defer_tasks == nullptr; });
128
129 set_end_callback(PRIORITY_IDLE_TASK, [](Reactor *reactor) {
130 if (reactor->idle_task.callback) {
131 reactor->idle_task.callback(reactor->idle_task.data);
132 }
133 });
134
135 set_end_callback(PRIORITY_SIGNAL_CALLBACK, [](Reactor *reactor) {
136 if (sw_unlikely(reactor->singal_no)) {
137 swoole_signal_callback(reactor->singal_no);
138 reactor->singal_no = 0;
139 }
140 });
141
142 set_end_callback(PRIORITY_TRY_EXIT, [](Reactor *reactor) {
143 if (reactor->wait_exit && reactor->if_exit()) {
144 reactor->running = false;
145 }
146 });
147
148 #ifdef SW_USE_MALLOC_TRIM
149 set_end_callback(PRIORITY_MALLOC_TRIM, [](Reactor *reactor) {
150 time_t now = ::time(nullptr);
151 if (reactor->last_malloc_trim_time < now - SW_MALLOC_TRIM_INTERVAL) {
152 malloc_trim(SW_MALLOC_TRIM_PAD);
153 reactor->last_malloc_trim_time = now;
154 }
155 });
156 #endif
157
158 set_exit_condition(EXIT_CONDITION_DEFAULT,
159 [](Reactor *reactor, size_t &event_num) -> bool { return event_num == 0; });
160 }
161
set_handler(int _fdtype,ReactorHandler handler)162 bool Reactor::set_handler(int _fdtype, ReactorHandler handler) {
163 int fdtype = get_fd_type(_fdtype);
164
165 if (fdtype >= SW_MAX_FDTYPE) {
166 swoole_warning("fdtype > SW_MAX_FDTYPE[%d]", SW_MAX_FDTYPE);
167 return false;
168 }
169
170 if (isset_read_event(_fdtype)) {
171 read_handler[fdtype] = handler;
172 } else if (isset_write_event(_fdtype)) {
173 write_handler[fdtype] = handler;
174 } else if (isset_error_event(_fdtype)) {
175 error_handler[fdtype] = handler;
176 } else {
177 swoole_warning("unknown fdtype");
178 return false;
179 }
180
181 return true;
182 }
183
if_exit()184 bool Reactor::if_exit() {
185 size_t _event_num = get_event_num();
186 for (auto &kv : exit_conditions) {
187 if (kv.second(this, _event_num) == false) {
188 return false;
189 }
190 }
191 return true;
192 }
193
activate_future_task()194 void Reactor::activate_future_task() {
195 onBegin = reactor_begin;
196 }
197
reactor_begin(Reactor * reactor)198 static void reactor_begin(Reactor *reactor) {
199 if (reactor->future_task.callback) {
200 reactor->future_task.callback(reactor->future_task.data);
201 }
202 }
203
_close(Reactor * reactor,Socket * socket)204 int Reactor::_close(Reactor *reactor, Socket *socket) {
205 swoole_trace_log(SW_TRACE_CLOSE, "fd=%d", socket->fd);
206 socket->free();
207 return SW_OK;
208 }
209
210 using SendFunc = std::function<ssize_t(void)>;
211 using AppendFunc = std::function<void(Buffer *buffer)>;
212
write_func(Reactor * reactor,Socket * socket,const size_t __len,const SendFunc & send_fn,const AppendFunc & append_fn)213 static ssize_t write_func(
214 Reactor *reactor, Socket *socket, const size_t __len, const SendFunc &send_fn, const AppendFunc &append_fn) {
215 ssize_t retval;
216 Buffer *buffer = socket->out_buffer;
217 int fd = socket->fd;
218
219 if (socket->buffer_size == 0) {
220 socket->set_memory_buffer_size(Socket::default_buffer_size);
221 }
222
223 if (socket->nonblock == 0) {
224 socket->set_fd_option(1, -1);
225 }
226
227 if ((uint32_t) __len > socket->buffer_size) {
228 swoole_error_log(SW_LOG_WARNING,
229 SW_ERROR_PACKAGE_LENGTH_TOO_LARGE,
230 "data packet is too large, cannot exceed the buffer size");
231 return SW_ERR;
232 }
233
234 if (Buffer::empty(buffer)) {
235 #ifdef SW_USE_OPENSSL
236 if (socket->ssl_send_) {
237 goto _alloc_buffer;
238 }
239 #endif
240 _do_send:
241 retval = send_fn();
242
243 if (retval > 0) {
244 if ((ssize_t) __len == retval) {
245 return retval;
246 } else {
247 goto _alloc_buffer;
248 }
249 } else if (socket->catch_error(errno) == SW_WAIT) {
250 _alloc_buffer:
251 if (!socket->out_buffer) {
252 buffer = new Buffer(socket->chunk_size);
253 if (!buffer) {
254 swoole_warning("create worker buffer failed");
255 return SW_ERR;
256 }
257 socket->out_buffer = buffer;
258 }
259 if (!socket->isset_writable_event()) {
260 reactor->add_write_event(socket);
261 }
262 goto _append_buffer;
263 } else if (errno == EINTR) {
264 goto _do_send;
265 } else {
266 swoole_set_last_error(errno);
267 return SW_ERR;
268 }
269 } else {
270 _append_buffer:
271 if (buffer->length() > socket->buffer_size) {
272 if (socket->dontwait) {
273 swoole_set_last_error(SW_ERROR_OUTPUT_BUFFER_OVERFLOW);
274 return SW_ERR;
275 } else {
276 swoole_error_log(
277 SW_LOG_WARNING, SW_ERROR_OUTPUT_BUFFER_OVERFLOW, "socket#%d output buffer overflow", fd);
278 sw_yield();
279 socket->wait_event(SW_SOCKET_OVERFLOW_WAIT, SW_EVENT_WRITE);
280 }
281 }
282 append_fn(buffer);
283 }
284 return __len;
285 }
286
_write(Reactor * reactor,Socket * socket,const void * buf,size_t n)287 ssize_t Reactor::_write(Reactor *reactor, Socket *socket, const void *buf, size_t n) {
288 ssize_t send_bytes = 0;
289 auto send_fn = [&send_bytes, socket, buf, n]() -> ssize_t {
290 send_bytes = socket->send(buf, n, 0);
291 return send_bytes;
292 };
293 auto append_fn = [&send_bytes, buf, n](Buffer *buffer) {
294 ssize_t offset = send_bytes > 0 ? send_bytes : 0;
295 buffer->append((const char *) buf + offset, n - offset);
296 };
297 return write_func(reactor, socket, n, send_fn, append_fn);
298 }
299
_writev(Reactor * reactor,network::Socket * socket,const iovec * iov,size_t iovcnt)300 ssize_t Reactor::_writev(Reactor *reactor, network::Socket *socket, const iovec *iov, size_t iovcnt) {
301 #ifdef SW_USE_OPENSSL
302 if (socket->ssl) {
303 swoole_error_log(SW_LOG_WARNING, SW_ERROR_OPERATION_NOT_SUPPORT, "does not support SSL");
304 return SW_ERR;
305 }
306 #endif
307
308 ssize_t send_bytes = 0;
309 size_t n = 0;
310 SW_LOOP_N(iovcnt) {
311 n += iov[i].iov_len;
312 }
313 auto send_fn = [&send_bytes, socket, iov, iovcnt]() -> ssize_t {
314 send_bytes = socket->writev(iov, iovcnt);
315 return send_bytes;
316 };
317 auto append_fn = [&send_bytes, iov, iovcnt](Buffer *buffer) {
318 ssize_t offset = send_bytes > 0 ? send_bytes : 0;
319 buffer->append(iov, iovcnt, offset);
320 };
321 return write_func(reactor, socket, n, send_fn, append_fn);
322 }
323
_writable_callback(Reactor * reactor,Event * ev)324 int Reactor::_writable_callback(Reactor *reactor, Event *ev) {
325 int ret;
326
327 Socket *socket = ev->socket;
328 Buffer *buffer = socket->out_buffer;
329
330 while (!Buffer::empty(buffer)) {
331 BufferChunk *chunk = buffer->front();
332 if (chunk->type == BufferChunk::TYPE_CLOSE) {
333 return reactor->close(reactor, ev->socket);
334 } else if (chunk->type == BufferChunk::TYPE_SENDFILE) {
335 ret = socket->handle_sendfile();
336 } else {
337 ret = socket->handle_send();
338 }
339
340 if (ret < 0) {
341 if (socket->close_wait) {
342 return reactor->trigger_close_event(ev);
343 } else if (socket->send_wait) {
344 return SW_OK;
345 }
346 }
347 }
348
349 if (socket->send_timer) {
350 swoole_timer_del(socket->send_timer);
351 socket->send_timer = nullptr;
352 }
353
354 // remove EPOLLOUT event
355 if (Buffer::empty(buffer)) {
356 reactor->remove_write_event(ev->socket);
357 }
358
359 return SW_OK;
360 }
361
drain_write_buffer(swSocket * socket)362 void Reactor::drain_write_buffer(swSocket *socket) {
363 Event event = {};
364 event.socket = socket;
365 event.fd = socket->fd;
366
367 while (!Buffer::empty(socket->out_buffer)) {
368 if (socket->wait_event(network::Socket::default_write_timeout, SW_EVENT_WRITE) == SW_ERR) {
369 break;
370 }
371 _writable_callback(this, &event);
372 if (socket->close_wait || socket->removed) {
373 break;
374 }
375 }
376 }
377
add_destroy_callback(Callback cb,void * data)378 void Reactor::add_destroy_callback(Callback cb, void *data) {
379 destroy_callbacks.append(cb, data);
380 }
381
set_end_callback(enum EndCallback id,const std::function<void (Reactor *)> & fn)382 void Reactor::set_end_callback(enum EndCallback id, const std::function<void(Reactor *)> &fn) {
383 end_callbacks[id] = fn;
384 }
385
set_exit_condition(enum ExitCondition id,const std::function<bool (Reactor *,size_t &)> & fn)386 void Reactor::set_exit_condition(enum ExitCondition id, const std::function<bool(Reactor *, size_t &)> &fn) {
387 exit_conditions[id] = fn;
388 }
389
defer(Callback cb,void * data)390 void Reactor::defer(Callback cb, void *data) {
391 if (defer_tasks == nullptr) {
392 defer_tasks = new CallbackManager;
393 }
394 defer_tasks->append(cb, data);
395 }
396
execute_end_callbacks(bool timedout)397 void Reactor::execute_end_callbacks(bool timedout) {
398 for (auto &kv : end_callbacks) {
399 kv.second(this);
400 }
401 }
402
~Reactor()403 Reactor::~Reactor() {
404 destroyed = true;
405 destroy_callbacks.execute();
406 delete impl;
407 if (swoole_isset_hook(SW_GLOBAL_HOOK_ON_REACTOR_DESTROY)) {
408 swoole_call_hook(SW_GLOBAL_HOOK_ON_REACTOR_DESTROY, this);
409 }
410 }
411
412 } // namespace swoole
413