1 /*
2  +----------------------------------------------------------------------+
3  | Swoole                                                               |
4  +----------------------------------------------------------------------+
5  | This source file is subject to version 2.0 of the Apache license,    |
6  | that is bundled with this package in the file LICENSE, and is        |
7  | available through the world-wide-web at the following url:           |
8  | http://www.apache.org/licenses/LICENSE-2.0.html                      |
9  | If you did not receive a copy of the Apache2.0 license and are unable|
10  | to obtain it through the world-wide-web, please send a note to       |
11  | license@swoole.com so we can mail you a copy immediately.            |
12  +----------------------------------------------------------------------+
13  | Author: Tianfeng Han  <mikan.tenny@gmail.com>                        |
14  +----------------------------------------------------------------------+
15  */
16 
17 #include "swoole_api.h"
18 #include "swoole_socket.h"
19 #include "swoole_reactor.h"
20 #include "swoole_string.h"
21 #include "swoole_signal.h"
22 #include "swoole_pipe.h"
23 #include "swoole_async.h"
24 #include "swoole_util.h"
25 
26 #include <thread>
Stream_onConnect(Client * cli)27 #include <atomic>
28 #include <unordered_map>
29 #include <chrono>
30 #include <condition_variable>
31 #include <mutex>
32 #include <queue>
33 #include <sstream>
34 
35 namespace swoole {
36 namespace async {
37 //-------------------------------------------------------------------------------
38 class EventQueue {
39   public:
40     inline void push(AsyncEvent *event) {
41         _queue.push(event);
42     }
43 
44     inline AsyncEvent *pop() {
45         if (_queue.empty()) {
46             return nullptr;
47         }
48         AsyncEvent *retval = _queue.front();
49         _queue.pop();
50         return retval;
51     }
52 
53     inline double get_max_wait_time() {
54         if (_queue.empty()) {
55             return 0;
56         } else {
57             AsyncEvent *event = _queue.front();
58             return microtime() - event->timestamp;
59         }
60     }
61 
62     inline size_t count() {
63         return _queue.size();
64     }
65 
66   private:
67     std::queue<AsyncEvent *> _queue;
68 };
69 
70 class ThreadPool {
71   public:
72     ThreadPool(size_t _core_worker_num, size_t _worker_num, double _max_wait_time, double _max_idle_time) {
73         running = false;
74 
75         core_worker_num = _core_worker_num == 0 ? SW_CPU_NUM : SW_MAX(1, _core_worker_num);
76         worker_num = _worker_num == 0 ? SW_CPU_NUM * SW_AIO_THREAD_NUM_MULTIPLE : SW_MAX(core_worker_num, _worker_num);
77         max_wait_time = _max_wait_time == 0 ? SW_AIO_TASK_MAX_WAIT_TIME : _max_wait_time;
78         max_idle_time = _max_idle_time == 0 ? SW_AIO_THREAD_MAX_IDLE_TIME : _max_idle_time;
79     }
80 
81     ~ThreadPool() {
82         shutdown();
83     }
84 
85     bool start() {
86         running = true;
87         current_task_id = 0;
88         n_waiting = 0;
89         n_closing = 0;
90         for (size_t i = 0; i < core_worker_num; i++) {
91             create_thread(true);
92         }
93         return true;
94     }
95 
96     bool shutdown() {
97         if (!running) {
98             return false;
99         }
100 
101         event_mutex.lock();
102         running = false;
103         _cv.notify_all();
104         event_mutex.unlock();
105 
106         for (auto &i : threads) {
107             std::thread *_thread = i.second;
108             if (_thread->joinable()) {
109                 _thread->join();
110             }
111             delete _thread;
112         }
113 
114         return true;
115     }
116 
117     void schedule() {
118         if (n_waiting == 0 && threads.size() < worker_num && max_wait_time > 0) {
119             event_mutex.lock();
120             double _max_wait_time = _queue.get_max_wait_time();
121             event_mutex.unlock();
122 
123             if (_max_wait_time > max_wait_time) {
124                 size_t n = 1;
125                 /**
126                  * maybe we can find a better strategy
127                  */
128                 if (threads.size() + n > worker_num) {
129                     n = worker_num - threads.size();
130                 }
131                 swoole_trace_log(SW_TRACE_AIO,
132                            "Create %zu thread due to wait %fs, we will have %zu threads",
133                            n,
134                            _max_wait_time,
135                            threads.size() + n);
136                 while (n--) {
137                     create_thread();
138                 }
139             }
140         }
141     }
142 
143     AsyncEvent *dispatch(const AsyncEvent *request) {
144         if (SwooleTG.async_threads->schedule) {
145             schedule();
146         }
147         auto _event_copy = new AsyncEvent(*request);
148         _event_copy->task_id = current_task_id++;
149         _event_copy->timestamp = microtime();
150         _event_copy->pipe_socket = SwooleTG.async_threads->write_socket;
151         event_mutex.lock();
152         _queue.push(_event_copy);
153         _cv.notify_one();
154         event_mutex.unlock();
155         swoole_debug("push and notify one: %f", microtime());
156         return _event_copy;
157     }
158 
159     inline size_t get_worker_num() {
160         return threads.size();
161     }
162 
163     inline size_t get_queue_size() {
164         std::unique_lock<std::mutex> lock(event_mutex);
165         return _queue.count();
166     }
167 
168     static std::string get_thread_id(std::thread::id id) {
169         std::stringstream ss;
170         ss << id;
171         return ss.str();
172     }
173 
174     void release_thread(std::thread::id tid) {
175         auto i = threads.find(tid);
176         if (i == threads.end()) {
177             swoole_warning("AIO thread#%s is missing", get_thread_id(tid).c_str());
178             return;
179         } else {
180             std::thread *_thread = i->second;
181             swoole_trace_log(SW_TRACE_AIO,
182                        "release idle thread#%s, we have %zu now",
183                        get_thread_id(tid).c_str(),
184                        threads.size() - 1);
185             if (_thread->joinable()) {
186                 _thread->join();
187             }
188             threads.erase(i);
189             delete _thread;
190         }
191     }
192 
193     static void release_callback(AsyncEvent *event) {
194         std::thread::id *tid = reinterpret_cast<std::thread::id *>(event->object);
195         SwooleTG.async_threads->pool->release_thread(*tid);
196         delete tid;
197         // balance
198         SwooleTG.async_threads->task_num++;
199     }
200 
201     void notify_one() {
202         _cv.notify_one();
203     }
204 
205   private:
206     void create_thread(const bool is_core_worker = false);
207 
208     size_t core_worker_num;
209     size_t worker_num;
210     double max_wait_time;
211     double max_idle_time;
212 
213     bool running;
214 
215     std::atomic<size_t> n_waiting;
216     std::atomic<size_t> n_closing;
217     size_t current_task_id = 0;
218     std::unordered_map<std::thread::id, std::thread *> threads;
219     EventQueue _queue;
220     std::mutex event_mutex;
221     std::condition_variable _cv;
222 };
223 
224 void ThreadPool::create_thread(const bool is_core_worker) {
225     try {
226         std::thread *_thread = new std::thread([this, is_core_worker]() {
227             bool exit_flag = false;
228             SwooleTG.buffer_stack = new String(SW_STACK_BUFFER_SIZE);
229             ON_SCOPE_EXIT {
230                 delete SwooleTG.buffer_stack;
231                 SwooleTG.buffer_stack = nullptr;
232             };
233 
234             swoole_signal_block_all();
235 
236             while (running) {
237                 event_mutex.lock();
238                 AsyncEvent *event = _queue.pop();
239                 event_mutex.unlock();
240 
241                 swoole_debug("%s: %f", event ? "pop 1 event" : "no event", microtime());
242 
243                 if (event) {
244                     if (sw_unlikely(event->handler == nullptr)) {
245                         event->error = SW_ERROR_AIO_BAD_REQUEST;
246                         event->retval = -1;
247                     } else if (sw_unlikely(event->canceled)) {
248                         event->error = SW_ERROR_AIO_CANCELED;
249                         event->retval = -1;
250                     } else {
251                         event->handler(event);
252                     }
253 
254                     swoole_trace_log(SW_TRACE_AIO,
255                                "aio_thread %s. ret=%ld, error=%d",
256                                event->retval > 0 ? "ok" : "failed",
257                                event->retval,
258                                event->error);
259 
260                 _send_event:
261                     while (true) {
262                         ssize_t ret = event->pipe_socket->write(&event, sizeof(event));
263                         if (ret < 0) {
264                             if (errno == EAGAIN) {
265                                 event->pipe_socket->wait_event(1000, SW_EVENT_WRITE);
266                                 continue;
267                             } else if (errno == EINTR) {
268                                 continue;
269                             } else {
270                                 delete event;
271                                 swoole_sys_warning("sendto swoole_aio_pipe_write failed");
272                             }
273                         }
274                         break;
275                     }
276 
277                     // exit
278                     if (exit_flag) {
279                         n_closing--;
280                         break;
281                     }
282                 } else {
283                     std::unique_lock<std::mutex> lock(event_mutex);
284                     if (_queue.count() > 0) {
285                         continue;
286                     }
287                     if (!running) {
288                         break;
289                     }
290                     ++n_waiting;
291                     if (is_core_worker || max_idle_time <= 0) {
292                         _cv.wait(lock);
293                     } else {
294                         while (true) {
295                             if (_cv.wait_for(lock, std::chrono::microseconds((size_t)(max_idle_time * 1000 * 1000))) ==
296                                 std::cv_status::timeout) {
297                                 if (running && n_closing != 0) {
298                                     // wait for the next round
299                                     continue;
300                                 }
301                                 /* notifies the main thread to release this thread */
302                                 event = new AsyncEvent;
303                                 event->object = new std::thread::id(std::this_thread::get_id());
304                                 event->callback = release_callback;
305                                 event->pipe_socket = SwooleG.aio_default_socket;
306                                 event->canceled = false;
307 
308                                 --n_waiting;
309                                 ++n_closing;
310                                 exit_flag = true;
311                                 goto _send_event;
312                             }
313                             break;
314                         }
315                     }
316                     --n_waiting;
317                 }
318             }
319         });
320         threads[_thread->get_id()] = _thread;
321     } catch (const std::system_error &e) {
322         swoole_sys_notice("create aio thread failed, please check your system configuration or adjust aio_worker_num");
323         return;
324     }
325 }
326 
327 AsyncEvent *dispatch(const AsyncEvent *request) {
328     if (sw_unlikely(!SwooleTG.async_threads)) {
329         SwooleTG.async_threads = new AsyncThreads();
330     }
331     AsyncEvent *event = SwooleTG.async_threads->pool->dispatch(request);
332     if (sw_likely(event)) {
333         SwooleTG.async_threads->task_num++;
334     }
335     return event;
336 }
337 
338 //-------------------------------------------------------------------------------
339 }  // namespace async
340 
341 int AsyncThreads::callback(Reactor *reactor, Event *event) {
342     if (SwooleTG.async_threads->schedule) {
343         SwooleTG.async_threads->pool->schedule();
344     }
345 
346     AsyncEvent *events[SW_AIO_EVENT_NUM];
347     ssize_t n = event->socket->read(events, sizeof(AsyncEvent *) * SW_AIO_EVENT_NUM);
348     if (n < 0) {
349         swoole_sys_warning("read() aio events failed");
350         return SW_ERR;
351     }
352     for (size_t i = 0; i < n / sizeof(AsyncEvent *); i++) {
353         AsyncEvent *event = events[i];
354         if (!event->canceled) {
355             event->callback(event);
356         }
357         SwooleTG.async_threads->task_num--;
358         delete event;
359     }
360 
361     return SW_OK;
362 }
363 
364 size_t AsyncThreads::get_worker_num() {
365     return pool ? pool->get_worker_num() : 0;
366 }
367 
368 size_t AsyncThreads::get_queue_size() {
369     return pool ? pool->get_queue_size() : 0;
370 }
371 
372 void AsyncThreads::notify_one() {
373     if (pool) {
374         pool->notify_one();
375     }
376 }
377 
378 AsyncThreads::AsyncThreads() {
379     if (!SwooleTG.reactor) {
380         swoole_warning("no event loop, cannot initialized");
381         throw swoole::Exception(SW_ERROR_WRONG_OPERATION);
382     }
383 
384     pipe = new Pipe(false);
385     if (!pipe->ready()) {
386         delete pipe;
387         pipe = nullptr;
388         swoole_throw_error(SW_ERROR_SYSTEM_CALL_FAIL);
389     }
390 
391     read_socket = pipe->get_socket(false);
392     write_socket = pipe->get_socket(true);
393     read_socket->fd_type = SW_FD_AIO;
394     write_socket->fd_type = SW_FD_AIO;
395 
396     swoole_event_add(read_socket, SW_EVENT_READ);
397 
398     sw_reactor()->add_destroy_callback([](void *data) {
399         if (!SwooleTG.async_threads) {
400             return;
401         }
402         swoole_event_del(SwooleTG.async_threads->read_socket);
403         delete SwooleTG.async_threads;
404         SwooleTG.async_threads = nullptr;
405     });
406 
407     sw_reactor()->set_exit_condition(Reactor::EXIT_CONDITION_AIO_TASK, [](Reactor *reactor, size_t &event_num) -> bool {
408         if (SwooleTG.async_threads && SwooleTG.async_threads->task_num == 0) {
409             event_num--;
410         }
411         return true;
412     });
413 
414     init_lock.lock();
415     pool = new async::ThreadPool(
416         SwooleG.aio_core_worker_num, SwooleG.aio_worker_num, SwooleG.aio_max_wait_time, SwooleG.aio_max_idle_time);
417     pool->start();
418     schedule = true;
419     init_lock.unlock();
420 
421     SwooleG.aio_default_socket = write_socket;
422     SwooleTG.async_threads = this;
423 }
424 
425 AsyncThreads::~AsyncThreads() {
426     delete pool;
427     pool = nullptr;
428     pipe->close();
429     read_socket = nullptr;
430     write_socket = nullptr;
431     delete pipe;
432     pipe = nullptr;
433 }
434 };  // namespace swoole
435