1 /*
2   +----------------------------------------------------------------------+
3   | Swoole                                                               |
4   +----------------------------------------------------------------------+
5   | This source file is subject to version 2.0 of the Apache license,    |
6   | that is bundled with this package in the file LICENSE, and is        |
7   | available through the world-wide-web at the following url:           |
8   | http://www.apache.org/licenses/LICENSE-2.0.html                      |
9   | If you did not receive a copy of the Apache2.0 license and are unable|
10   | to obtain it through the world-wide-web, please send a note to       |
11   | license@swoole.com so we can mail you a copy immediately.            |
12   +----------------------------------------------------------------------+
13   | Author: Tianfeng Han  <mikan.tenny@gmail.com>                        |
14   |         Twosee  <twose@qq.com>                                       |
15   +----------------------------------------------------------------------+
16 */
17 
18 #pragma once
19 
20 #include "swoole.h"
21 #include "swoole_socket.h"
22 
23 #include <list>
24 #include <map>
25 #include <unordered_map>
26 
27 namespace swoole {
28 
29 struct DeferCallback {
30     Callback callback;
31     void *data;
32 };
33 
34 class Reactor;
35 
36 class ReactorImpl {
37   protected:
38     Reactor *reactor_;
39 
40   public:
ReactorImpl(Reactor * _reactor)41     ReactorImpl(Reactor *_reactor) {
42         reactor_ = _reactor;
43     }
44     void after_removal_failure(network::Socket *_socket);
~ReactorImpl()45     virtual ~ReactorImpl(){};
46     virtual bool ready() = 0;
47     virtual int add(network::Socket *socket, int events) = 0;
48     virtual int set(network::Socket *socket, int events) = 0;
49     virtual int del(network::Socket *socket) = 0;
50     virtual int wait(struct timeval *) = 0;
51 };
52 
53 class CallbackManager {
54   public:
55     typedef std::list<std::pair<Callback, void *>> TaskList;
append(Callback fn,void * private_data)56     void append(Callback fn, void *private_data) {
57         list_.emplace_back(fn, private_data);
58     }
prepend(Callback fn,void * private_data)59     void prepend(Callback fn, void *private_data) {
60         list_.emplace_front(fn, private_data);
61         auto t = list_.back();
62     }
remove(TaskList::iterator iter)63     void remove(TaskList::iterator iter) {
64         list_.erase(iter);
65     }
execute()66     void execute() {
67         while (!list_.empty()) {
68             std::pair<Callback, void *> task = list_.front();
69             list_.pop_front();
70             task.first(task.second);
71         }
72     }
73 
74   protected:
75     TaskList list_;
76 };
77 
78 class Reactor {
79   public:
80     enum Type {
81         TYPE_AUTO,
82         TYPE_EPOLL,
83         TYPE_KQUEUE,
84         TYPE_POLL,
85         TYPE_SELECT,
86     };
87 
88     enum EndCallback {
89         PRIORITY_TIMER = 0,
90         PRIORITY_DEFER_TASK,
91         PRIORITY_IDLE_TASK,
92         PRIORITY_SIGNAL_CALLBACK,
93         PRIORITY_TRY_EXIT,
94         PRIORITY_MALLOC_TRIM,
95         PRIORITY_WORKER_CALLBACK,
96     };
97 
98     enum ExitCondition {
99         EXIT_CONDITION_TIMER = 0,
100         EXIT_CONDITION_DEFER_TASK,
101         EXIT_CONDITION_WAIT_PID,
102         EXIT_CONDITION_CO_SIGNAL_LISTENER,
103         EXIT_CONDITION_SIGNAL_LISTENER,
104         EXIT_CONDITION_AIO_TASK,
105         EXIT_CONDITION_SIGNALFD,
106         EXIT_CONDITION_USER_BEFORE_DEFAULT,
107         EXIT_CONDITION_DEFAULT = 999,
108         EXIT_CONDITION_USER_AFTER_DEFAULT,
109     };
110 
111     Type type_;
112     void *ptr = nullptr;
113     int native_handle = -1;
114 
115     /**
116      * last signal number
117      */
118     int singal_no = 0;
119 
120     uint32_t max_event_num = 0;
121 
122     bool running = false;
123     bool start = false;
124     bool once = false;
125     bool wait_exit = false;
126     bool destroyed = false;
127     bool bailout = false;
128     /**
129      * callback signal
130      */
131     bool check_signalfd = false;
132     /**
133      * reactor->wait timeout (millisecond) or -1
134      */
135     int32_t timeout_msec = 0;
136 
137     uint16_t id = 0;
138 
139     uint32_t max_socket = 0;
140 
141 #ifdef SW_USE_MALLOC_TRIM
142     time_t last_malloc_trim_time = 0;
143 #endif
144 
145     ReactorHandler read_handler[SW_MAX_FDTYPE] = {};
146     ReactorHandler write_handler[SW_MAX_FDTYPE] = {};
147     ReactorHandler error_handler[SW_MAX_FDTYPE] = {};
148 
149     ReactorHandler default_write_handler = nullptr;
150     ReactorHandler default_error_handler = nullptr;
151 
add(network::Socket * socket,int events)152     int add(network::Socket *socket, int events) {
153         return impl->add(socket, events);
154     }
155 
set(network::Socket * socket,int events)156     int set(network::Socket *socket, int events) {
157         return impl->set(socket, events);
158     }
159 
del(network::Socket * socket)160     int del(network::Socket *socket) {
161         return impl->del(socket);
162     }
163 
wait(struct timeval * timeout)164     int wait(struct timeval *timeout) {
165         return impl->wait(timeout);
166     }
167 
168     CallbackManager *defer_tasks = nullptr;
169     CallbackManager destroy_callbacks;
170 
171     DeferCallback idle_task;
172     DeferCallback future_task;
173 
174     std::function<void(Reactor *)> onBegin;
175 
176     ssize_t (*write)(Reactor *reactor, network::Socket *socket, const void *buf, size_t n) = nullptr;
177     ssize_t (*writev)(Reactor *reactor, network::Socket *socket, const iovec *iov, size_t iovcnt) = nullptr;
178     int (*close)(Reactor *reactor, network::Socket *socket) = nullptr;
179 
180   private:
181     ReactorImpl *impl;
182     std::map<int, std::function<void(Reactor *)>> end_callbacks;
183     std::map<int, std::function<bool(Reactor *, size_t &)>> exit_conditions;
184     std::unordered_map<int, network::Socket *> sockets_;
185 
186   public:
187     Reactor(int max_event = SW_REACTOR_MAXEVENTS, Type _type = TYPE_AUTO);
188     ~Reactor();
189     bool if_exit();
190     void defer(Callback cb, void *data = nullptr);
191     void set_end_callback(enum EndCallback id, const std::function<void(Reactor *)> &fn);
192     void set_exit_condition(enum ExitCondition id, const std::function<bool(Reactor *, size_t &)> &fn);
193     bool set_handler(int _fdtype, ReactorHandler handler);
194     void add_destroy_callback(Callback cb, void *data = nullptr);
195     void execute_end_callbacks(bool timedout = false);
196     void drain_write_buffer(network::Socket *socket);
197 
ready()198     bool ready() {
199         return running;
200     }
201 
remove_exit_condition(enum ExitCondition id)202     inline size_t remove_exit_condition(enum ExitCondition id) {
203         return exit_conditions.erase(id);
204     }
205 
isset_exit_condition(enum ExitCondition id)206     inline bool isset_exit_condition(enum ExitCondition id) {
207         return exit_conditions.find(id) != exit_conditions.end();
208     }
209 
isset_handler(int fdtype)210     inline bool isset_handler(int fdtype) {
211         return read_handler[fdtype] != nullptr;
212     }
213 
add_event(network::Socket * _socket,EventType event_type)214     inline int add_event(network::Socket *_socket, EventType event_type) {
215         if (!(_socket->events & event_type)) {
216             return set(_socket, _socket->events | event_type);
217         }
218         return SW_OK;
219     }
220 
del_event(network::Socket * _socket,EventType event_type)221     inline int del_event(network::Socket *_socket, EventType event_type) {
222         if (_socket->events & event_type) {
223             return set(_socket, _socket->events & (~event_type));
224         }
225         return SW_OK;
226     }
227 
remove_read_event(network::Socket * _socket)228     inline int remove_read_event(network::Socket *_socket) {
229         if (_socket->events & SW_EVENT_WRITE) {
230             _socket->events &= (~SW_EVENT_READ);
231             return set(_socket, _socket->events);
232         } else {
233             return del(_socket);
234         }
235     }
236 
remove_write_event(network::Socket * _socket)237     inline int remove_write_event(network::Socket *_socket) {
238         if (_socket->events & SW_EVENT_READ) {
239             _socket->events &= (~SW_EVENT_WRITE);
240             return set(_socket, _socket->events);
241         } else {
242             return del(_socket);
243         }
244     }
245 
add_read_event(network::Socket * _socket)246     inline int add_read_event(network::Socket *_socket) {
247         if (_socket->events & SW_EVENT_WRITE) {
248             _socket->events |= SW_EVENT_READ;
249             return set(_socket, _socket->events);
250         } else {
251             return add(_socket, SW_EVENT_READ);
252         }
253     }
254 
add_write_event(network::Socket * _socket)255     inline int add_write_event(network::Socket *_socket) {
256         if (_socket->events & SW_EVENT_READ) {
257             _socket->events |= SW_EVENT_WRITE;
258             return set(_socket, _socket->events);
259         } else {
260             return add(_socket, SW_EVENT_WRITE);
261         }
262     }
263 
exists(network::Socket * _socket)264     inline bool exists(network::Socket *_socket) {
265         return !_socket->removed && _socket->events;
266     }
267 
get_timeout_msec()268     inline int get_timeout_msec() {
269         return defer_tasks == nullptr ? timeout_msec : 0;
270     }
271 
get_event_num()272     size_t get_event_num() {
273         return sockets_.size();
274     }
275 
get_sockets()276     const std::unordered_map<int, network::Socket *> &get_sockets() {
277         return sockets_;
278     }
279 
get_socket(int fd)280     network::Socket *get_socket(int fd) {
281         return sockets_[fd];
282     }
283 
foreach_socket(const std::function<void (int,network::Socket *)> & callback)284     void foreach_socket(const std::function<void(int, network::Socket *)> &callback) {
285         for (auto kv : sockets_) {
286             callback(kv.first, kv.second);
287         }
288     }
289 
get_handler(EventType event_type,FdType fd_type)290     inline ReactorHandler get_handler(EventType event_type, FdType fd_type) {
291         switch (event_type) {
292         case SW_EVENT_READ:
293             return read_handler[fd_type];
294         case SW_EVENT_WRITE:
295             return write_handler[fd_type] ? write_handler[fd_type] : default_write_handler;
296         case SW_EVENT_ERROR:
297             return error_handler[fd_type] ? error_handler[fd_type] : default_error_handler;
298         default:
299             abort();
300             break;
301         }
302         return nullptr;
303     }
304 
get_error_handler(FdType fd_type)305     inline ReactorHandler get_error_handler(FdType fd_type) {
306         ReactorHandler handler = get_handler(SW_EVENT_ERROR, fd_type);
307         // error callback is not set, try to use readable or writable callback
308         if (handler == nullptr) {
309             handler = get_handler(SW_EVENT_READ, fd_type);
310             if (handler == nullptr) {
311                 handler = get_handler(SW_EVENT_WRITE, fd_type);
312             }
313         }
314         return handler;
315     }
316 
before_wait()317     inline void before_wait() {
318         start = running = true;
319     }
320 
trigger_close_event(Event * event)321     inline int trigger_close_event(Event *event) {
322         return default_error_handler(this, event);
323     }
324 
set_wait_exit(bool enable)325     inline void set_wait_exit(bool enable) {
326         wait_exit = enable;
327     }
328 
_add(network::Socket * _socket,int events)329     inline void _add(network::Socket *_socket, int events) {
330         _socket->events = events;
331         _socket->removed = 0;
332         sockets_[_socket->fd] = _socket;
333     }
334 
_set(network::Socket * _socket,int events)335     inline void _set(network::Socket *_socket, int events) {
336         _socket->events = events;
337     }
338 
_del(network::Socket * _socket)339     inline void _del(network::Socket *_socket) {
340         _socket->events = 0;
341         _socket->removed = 1;
342         sockets_.erase(_socket->fd);
343     }
344 
catch_error()345     bool catch_error() {
346         switch (errno) {
347         case EINTR:
348             return true;
349         }
350         return false;
351     }
352 
353     static ssize_t _write(Reactor *reactor, network::Socket *socket, const void *buf, size_t n);
354     static ssize_t _writev(Reactor *reactor, network::Socket *socket, const iovec *iov, size_t iovcnt);
355     static int _close(Reactor *reactor, network::Socket *socket);
356     static int _writable_callback(Reactor *reactor, Event *ev);
357 
358     void activate_future_task();
359 
get_fd_type(int flags)360     static FdType get_fd_type(int flags) {
361         return (FdType)(flags & (~SW_EVENT_READ) & (~SW_EVENT_WRITE) & (~SW_EVENT_ERROR) & (~SW_EVENT_ONCE));
362     }
363 
isset_read_event(int events)364     static bool isset_read_event(int events) {
365         return (events < SW_EVENT_DEAULT) || (events & SW_EVENT_READ);
366     }
367 
isset_write_event(int events)368     static bool isset_write_event(int events) {
369         return events & SW_EVENT_WRITE;
370     }
371 
isset_error_event(int events)372     static bool isset_error_event(int events) {
373         return events & SW_EVENT_ERROR;
374     }
375 };
376 }  // namespace swoole
377 
378 #define SW_REACTOR_CONTINUE                                                                                            \
379     if (reactor_->once) {                                                                                              \
380         break;                                                                                                         \
381     } else {                                                                                                           \
382         continue;                                                                                                      \
383     }
384