1 /* 2 +----------------------------------------------------------------------+ 3 | Swoole | 4 +----------------------------------------------------------------------+ 5 | This source file is subject to version 2.0 of the Apache license, | 6 | that is bundled with this package in the file LICENSE, and is | 7 | available through the world-wide-web at the following url: | 8 | http://www.apache.org/licenses/LICENSE-2.0.html | 9 | If you did not receive a copy of the Apache2.0 license and are unable| 10 | to obtain it through the world-wide-web, please send a note to | 11 | license@swoole.com so we can mail you a copy immediately. | 12 +----------------------------------------------------------------------+ 13 | Author: Tianfeng Han <mikan.tenny@gmail.com> | 14 | Twosee <twose@qq.com> | 15 +----------------------------------------------------------------------+ 16 */ 17 18 #pragma once 19 20 #include "swoole.h" 21 #include "swoole_socket.h" 22 23 #include <list> 24 #include <map> 25 #include <unordered_map> 26 27 namespace swoole { 28 29 struct DeferCallback { 30 Callback callback; 31 void *data; 32 }; 33 34 class Reactor; 35 36 class ReactorImpl { 37 protected: 38 Reactor *reactor_; 39 40 public: ReactorImpl(Reactor * _reactor)41 ReactorImpl(Reactor *_reactor) { 42 reactor_ = _reactor; 43 } 44 void after_removal_failure(network::Socket *_socket); ~ReactorImpl()45 virtual ~ReactorImpl(){}; 46 virtual bool ready() = 0; 47 virtual int add(network::Socket *socket, int events) = 0; 48 virtual int set(network::Socket *socket, int events) = 0; 49 virtual int del(network::Socket *socket) = 0; 50 virtual int wait(struct timeval *) = 0; 51 }; 52 53 class CallbackManager { 54 public: 55 typedef std::list<std::pair<Callback, void *>> TaskList; append(Callback fn,void * private_data)56 void append(Callback fn, void *private_data) { 57 list_.emplace_back(fn, private_data); 58 } prepend(Callback fn,void * private_data)59 void prepend(Callback fn, void *private_data) { 60 list_.emplace_front(fn, private_data); 61 auto t = list_.back(); 62 } remove(TaskList::iterator iter)63 void remove(TaskList::iterator iter) { 64 list_.erase(iter); 65 } execute()66 void execute() { 67 while (!list_.empty()) { 68 std::pair<Callback, void *> task = list_.front(); 69 list_.pop_front(); 70 task.first(task.second); 71 } 72 } 73 74 protected: 75 TaskList list_; 76 }; 77 78 class Reactor { 79 public: 80 enum Type { 81 TYPE_AUTO, 82 TYPE_EPOLL, 83 TYPE_KQUEUE, 84 TYPE_POLL, 85 TYPE_SELECT, 86 }; 87 88 enum EndCallback { 89 PRIORITY_TIMER = 0, 90 PRIORITY_DEFER_TASK, 91 PRIORITY_IDLE_TASK, 92 PRIORITY_SIGNAL_CALLBACK, 93 PRIORITY_TRY_EXIT, 94 PRIORITY_MALLOC_TRIM, 95 PRIORITY_WORKER_CALLBACK, 96 }; 97 98 enum ExitCondition { 99 EXIT_CONDITION_TIMER = 0, 100 EXIT_CONDITION_DEFER_TASK, 101 EXIT_CONDITION_WAIT_PID, 102 EXIT_CONDITION_CO_SIGNAL_LISTENER, 103 EXIT_CONDITION_SIGNAL_LISTENER, 104 EXIT_CONDITION_AIO_TASK, 105 EXIT_CONDITION_SIGNALFD, 106 EXIT_CONDITION_USER_BEFORE_DEFAULT, 107 EXIT_CONDITION_DEFAULT = 999, 108 EXIT_CONDITION_USER_AFTER_DEFAULT, 109 }; 110 111 Type type_; 112 void *ptr = nullptr; 113 int native_handle = -1; 114 115 /** 116 * last signal number 117 */ 118 int singal_no = 0; 119 120 uint32_t max_event_num = 0; 121 122 bool running = false; 123 bool start = false; 124 bool once = false; 125 bool wait_exit = false; 126 bool destroyed = false; 127 bool bailout = false; 128 /** 129 * callback signal 130 */ 131 bool check_signalfd = false; 132 /** 133 * reactor->wait timeout (millisecond) or -1 134 */ 135 int32_t timeout_msec = 0; 136 137 uint16_t id = 0; 138 139 uint32_t max_socket = 0; 140 141 #ifdef SW_USE_MALLOC_TRIM 142 time_t last_malloc_trim_time = 0; 143 #endif 144 145 ReactorHandler read_handler[SW_MAX_FDTYPE] = {}; 146 ReactorHandler write_handler[SW_MAX_FDTYPE] = {}; 147 ReactorHandler error_handler[SW_MAX_FDTYPE] = {}; 148 149 ReactorHandler default_write_handler = nullptr; 150 ReactorHandler default_error_handler = nullptr; 151 add(network::Socket * socket,int events)152 int add(network::Socket *socket, int events) { 153 return impl->add(socket, events); 154 } 155 set(network::Socket * socket,int events)156 int set(network::Socket *socket, int events) { 157 return impl->set(socket, events); 158 } 159 del(network::Socket * socket)160 int del(network::Socket *socket) { 161 return impl->del(socket); 162 } 163 wait(struct timeval * timeout)164 int wait(struct timeval *timeout) { 165 return impl->wait(timeout); 166 } 167 168 CallbackManager *defer_tasks = nullptr; 169 CallbackManager destroy_callbacks; 170 171 DeferCallback idle_task; 172 DeferCallback future_task; 173 174 std::function<void(Reactor *)> onBegin; 175 176 ssize_t (*write)(Reactor *reactor, network::Socket *socket, const void *buf, size_t n) = nullptr; 177 ssize_t (*writev)(Reactor *reactor, network::Socket *socket, const iovec *iov, size_t iovcnt) = nullptr; 178 int (*close)(Reactor *reactor, network::Socket *socket) = nullptr; 179 180 private: 181 ReactorImpl *impl; 182 std::map<int, std::function<void(Reactor *)>> end_callbacks; 183 std::map<int, std::function<bool(Reactor *, size_t &)>> exit_conditions; 184 std::unordered_map<int, network::Socket *> sockets_; 185 186 public: 187 Reactor(int max_event = SW_REACTOR_MAXEVENTS, Type _type = TYPE_AUTO); 188 ~Reactor(); 189 bool if_exit(); 190 void defer(Callback cb, void *data = nullptr); 191 void set_end_callback(enum EndCallback id, const std::function<void(Reactor *)> &fn); 192 void set_exit_condition(enum ExitCondition id, const std::function<bool(Reactor *, size_t &)> &fn); 193 bool set_handler(int _fdtype, ReactorHandler handler); 194 void add_destroy_callback(Callback cb, void *data = nullptr); 195 void execute_end_callbacks(bool timedout = false); 196 void drain_write_buffer(network::Socket *socket); 197 ready()198 bool ready() { 199 return running; 200 } 201 remove_exit_condition(enum ExitCondition id)202 inline size_t remove_exit_condition(enum ExitCondition id) { 203 return exit_conditions.erase(id); 204 } 205 isset_exit_condition(enum ExitCondition id)206 inline bool isset_exit_condition(enum ExitCondition id) { 207 return exit_conditions.find(id) != exit_conditions.end(); 208 } 209 isset_handler(int fdtype)210 inline bool isset_handler(int fdtype) { 211 return read_handler[fdtype] != nullptr; 212 } 213 add_event(network::Socket * _socket,EventType event_type)214 inline int add_event(network::Socket *_socket, EventType event_type) { 215 if (!(_socket->events & event_type)) { 216 return set(_socket, _socket->events | event_type); 217 } 218 return SW_OK; 219 } 220 del_event(network::Socket * _socket,EventType event_type)221 inline int del_event(network::Socket *_socket, EventType event_type) { 222 if (_socket->events & event_type) { 223 return set(_socket, _socket->events & (~event_type)); 224 } 225 return SW_OK; 226 } 227 remove_read_event(network::Socket * _socket)228 inline int remove_read_event(network::Socket *_socket) { 229 if (_socket->events & SW_EVENT_WRITE) { 230 _socket->events &= (~SW_EVENT_READ); 231 return set(_socket, _socket->events); 232 } else { 233 return del(_socket); 234 } 235 } 236 remove_write_event(network::Socket * _socket)237 inline int remove_write_event(network::Socket *_socket) { 238 if (_socket->events & SW_EVENT_READ) { 239 _socket->events &= (~SW_EVENT_WRITE); 240 return set(_socket, _socket->events); 241 } else { 242 return del(_socket); 243 } 244 } 245 add_read_event(network::Socket * _socket)246 inline int add_read_event(network::Socket *_socket) { 247 if (_socket->events & SW_EVENT_WRITE) { 248 _socket->events |= SW_EVENT_READ; 249 return set(_socket, _socket->events); 250 } else { 251 return add(_socket, SW_EVENT_READ); 252 } 253 } 254 add_write_event(network::Socket * _socket)255 inline int add_write_event(network::Socket *_socket) { 256 if (_socket->events & SW_EVENT_READ) { 257 _socket->events |= SW_EVENT_WRITE; 258 return set(_socket, _socket->events); 259 } else { 260 return add(_socket, SW_EVENT_WRITE); 261 } 262 } 263 exists(network::Socket * _socket)264 inline bool exists(network::Socket *_socket) { 265 return !_socket->removed && _socket->events; 266 } 267 get_timeout_msec()268 inline int get_timeout_msec() { 269 return defer_tasks == nullptr ? timeout_msec : 0; 270 } 271 get_event_num()272 size_t get_event_num() { 273 return sockets_.size(); 274 } 275 get_sockets()276 const std::unordered_map<int, network::Socket *> &get_sockets() { 277 return sockets_; 278 } 279 get_socket(int fd)280 network::Socket *get_socket(int fd) { 281 return sockets_[fd]; 282 } 283 foreach_socket(const std::function<void (int,network::Socket *)> & callback)284 void foreach_socket(const std::function<void(int, network::Socket *)> &callback) { 285 for (auto kv : sockets_) { 286 callback(kv.first, kv.second); 287 } 288 } 289 get_handler(EventType event_type,FdType fd_type)290 inline ReactorHandler get_handler(EventType event_type, FdType fd_type) { 291 switch (event_type) { 292 case SW_EVENT_READ: 293 return read_handler[fd_type]; 294 case SW_EVENT_WRITE: 295 return write_handler[fd_type] ? write_handler[fd_type] : default_write_handler; 296 case SW_EVENT_ERROR: 297 return error_handler[fd_type] ? error_handler[fd_type] : default_error_handler; 298 default: 299 abort(); 300 break; 301 } 302 return nullptr; 303 } 304 get_error_handler(FdType fd_type)305 inline ReactorHandler get_error_handler(FdType fd_type) { 306 ReactorHandler handler = get_handler(SW_EVENT_ERROR, fd_type); 307 // error callback is not set, try to use readable or writable callback 308 if (handler == nullptr) { 309 handler = get_handler(SW_EVENT_READ, fd_type); 310 if (handler == nullptr) { 311 handler = get_handler(SW_EVENT_WRITE, fd_type); 312 } 313 } 314 return handler; 315 } 316 before_wait()317 inline void before_wait() { 318 start = running = true; 319 } 320 trigger_close_event(Event * event)321 inline int trigger_close_event(Event *event) { 322 return default_error_handler(this, event); 323 } 324 set_wait_exit(bool enable)325 inline void set_wait_exit(bool enable) { 326 wait_exit = enable; 327 } 328 _add(network::Socket * _socket,int events)329 inline void _add(network::Socket *_socket, int events) { 330 _socket->events = events; 331 _socket->removed = 0; 332 sockets_[_socket->fd] = _socket; 333 } 334 _set(network::Socket * _socket,int events)335 inline void _set(network::Socket *_socket, int events) { 336 _socket->events = events; 337 } 338 _del(network::Socket * _socket)339 inline void _del(network::Socket *_socket) { 340 _socket->events = 0; 341 _socket->removed = 1; 342 sockets_.erase(_socket->fd); 343 } 344 catch_error()345 bool catch_error() { 346 switch (errno) { 347 case EINTR: 348 return true; 349 } 350 return false; 351 } 352 353 static ssize_t _write(Reactor *reactor, network::Socket *socket, const void *buf, size_t n); 354 static ssize_t _writev(Reactor *reactor, network::Socket *socket, const iovec *iov, size_t iovcnt); 355 static int _close(Reactor *reactor, network::Socket *socket); 356 static int _writable_callback(Reactor *reactor, Event *ev); 357 358 void activate_future_task(); 359 get_fd_type(int flags)360 static FdType get_fd_type(int flags) { 361 return (FdType)(flags & (~SW_EVENT_READ) & (~SW_EVENT_WRITE) & (~SW_EVENT_ERROR) & (~SW_EVENT_ONCE)); 362 } 363 isset_read_event(int events)364 static bool isset_read_event(int events) { 365 return (events < SW_EVENT_DEAULT) || (events & SW_EVENT_READ); 366 } 367 isset_write_event(int events)368 static bool isset_write_event(int events) { 369 return events & SW_EVENT_WRITE; 370 } 371 isset_error_event(int events)372 static bool isset_error_event(int events) { 373 return events & SW_EVENT_ERROR; 374 } 375 }; 376 } // namespace swoole 377 378 #define SW_REACTOR_CONTINUE \ 379 if (reactor_->once) { \ 380 break; \ 381 } else { \ 382 continue; \ 383 } 384