1 /*
2 +----------------------------------------------------------------------+
3 | Swoole |
4 +----------------------------------------------------------------------+
5 | This source file is subject to version 2.0 of the Apache license, |
6 | that is bundled with this package in the file LICENSE, and is |
7 | available through the world-wide-web at the following url: |
8 | http://www.apache.org/licenses/LICENSE-2.0.html |
9 | If you did not receive a copy of the Apache2.0 license and are unable|
10 | to obtain it through the world-wide-web, please send a note to |
11 | license@swoole.com so we can mail you a copy immediately. |
12 +----------------------------------------------------------------------+
13 | Author: Tianfeng Han <mikan.tenny@gmail.com> |
14 +----------------------------------------------------------------------+
15 */
16
17 #include "swoole_api.h"
18 #include "swoole_socket.h"
19 #include "swoole_reactor.h"
20 #include "swoole_string.h"
21 #include "swoole_signal.h"
22 #include "swoole_pipe.h"
23 #include "swoole_async.h"
24 #include "swoole_util.h"
25
26 #include <thread>
Stream_onConnect(Client * cli)27 #include <atomic>
28 #include <unordered_map>
29 #include <chrono>
30 #include <condition_variable>
31 #include <mutex>
32 #include <queue>
33 #include <sstream>
34
35 namespace swoole {
36 namespace async {
37 //-------------------------------------------------------------------------------
38 class EventQueue {
39 public:
40 inline void push(AsyncEvent *event) {
41 _queue.push(event);
42 }
43
44 inline AsyncEvent *pop() {
45 if (_queue.empty()) {
46 return nullptr;
47 }
48 AsyncEvent *retval = _queue.front();
49 _queue.pop();
50 return retval;
51 }
52
53 inline double get_max_wait_time() {
54 if (_queue.empty()) {
55 return 0;
56 } else {
57 AsyncEvent *event = _queue.front();
58 return microtime() - event->timestamp;
59 }
60 }
61
62 inline size_t count() {
63 return _queue.size();
64 }
65
66 private:
67 std::queue<AsyncEvent *> _queue;
68 };
69
70 class ThreadPool {
71 public:
72 ThreadPool(size_t _core_worker_num, size_t _worker_num, double _max_wait_time, double _max_idle_time) {
73 running = false;
74
75 core_worker_num = _core_worker_num == 0 ? SW_CPU_NUM : SW_MAX(1, _core_worker_num);
76 worker_num = _worker_num == 0 ? SW_CPU_NUM * SW_AIO_THREAD_NUM_MULTIPLE : SW_MAX(core_worker_num, _worker_num);
77 max_wait_time = _max_wait_time == 0 ? SW_AIO_TASK_MAX_WAIT_TIME : _max_wait_time;
78 max_idle_time = _max_idle_time == 0 ? SW_AIO_THREAD_MAX_IDLE_TIME : _max_idle_time;
79 }
80
81 ~ThreadPool() {
82 shutdown();
83 }
84
85 bool start() {
86 running = true;
87 current_task_id = 0;
88 n_waiting = 0;
89 n_closing = 0;
90 for (size_t i = 0; i < core_worker_num; i++) {
91 create_thread(true);
92 }
93 return true;
94 }
95
96 bool shutdown() {
97 if (!running) {
98 return false;
99 }
100
101 event_mutex.lock();
102 running = false;
103 _cv.notify_all();
104 event_mutex.unlock();
105
106 for (auto &i : threads) {
107 std::thread *_thread = i.second;
108 if (_thread->joinable()) {
109 _thread->join();
110 }
111 delete _thread;
112 }
113
114 return true;
115 }
116
117 void schedule() {
118 if (n_waiting == 0 && threads.size() < worker_num && max_wait_time > 0) {
119 event_mutex.lock();
120 double _max_wait_time = _queue.get_max_wait_time();
121 event_mutex.unlock();
122
123 if (_max_wait_time > max_wait_time) {
124 size_t n = 1;
125 /**
126 * maybe we can find a better strategy
127 */
128 if (threads.size() + n > worker_num) {
129 n = worker_num - threads.size();
130 }
131 swoole_trace_log(SW_TRACE_AIO,
132 "Create %zu thread due to wait %fs, we will have %zu threads",
133 n,
134 _max_wait_time,
135 threads.size() + n);
136 while (n--) {
137 create_thread();
138 }
139 }
140 }
141 }
142
143 AsyncEvent *dispatch(const AsyncEvent *request) {
144 if (SwooleTG.async_threads->schedule) {
145 schedule();
146 }
147 auto _event_copy = new AsyncEvent(*request);
148 _event_copy->task_id = current_task_id++;
149 _event_copy->timestamp = microtime();
150 _event_copy->pipe_socket = SwooleTG.async_threads->write_socket;
151 event_mutex.lock();
152 _queue.push(_event_copy);
153 _cv.notify_one();
154 event_mutex.unlock();
155 swoole_debug("push and notify one: %f", microtime());
156 return _event_copy;
157 }
158
159 inline size_t get_worker_num() {
160 return threads.size();
161 }
162
163 inline size_t get_queue_size() {
164 std::unique_lock<std::mutex> lock(event_mutex);
165 return _queue.count();
166 }
167
168 static std::string get_thread_id(std::thread::id id) {
169 std::stringstream ss;
170 ss << id;
171 return ss.str();
172 }
173
174 void release_thread(std::thread::id tid) {
175 auto i = threads.find(tid);
176 if (i == threads.end()) {
177 swoole_warning("AIO thread#%s is missing", get_thread_id(tid).c_str());
178 return;
179 } else {
180 std::thread *_thread = i->second;
181 swoole_trace_log(SW_TRACE_AIO,
182 "release idle thread#%s, we have %zu now",
183 get_thread_id(tid).c_str(),
184 threads.size() - 1);
185 if (_thread->joinable()) {
186 _thread->join();
187 }
188 threads.erase(i);
189 delete _thread;
190 }
191 }
192
193 static void release_callback(AsyncEvent *event) {
194 std::thread::id *tid = reinterpret_cast<std::thread::id *>(event->object);
195 SwooleTG.async_threads->pool->release_thread(*tid);
196 delete tid;
197 // balance
198 SwooleTG.async_threads->task_num++;
199 }
200
201 void notify_one() {
202 _cv.notify_one();
203 }
204
205 private:
206 void create_thread(const bool is_core_worker = false);
207
208 size_t core_worker_num;
209 size_t worker_num;
210 double max_wait_time;
211 double max_idle_time;
212
213 bool running;
214
215 std::atomic<size_t> n_waiting;
216 std::atomic<size_t> n_closing;
217 size_t current_task_id = 0;
218 std::unordered_map<std::thread::id, std::thread *> threads;
219 EventQueue _queue;
220 std::mutex event_mutex;
221 std::condition_variable _cv;
222 };
223
224 void ThreadPool::create_thread(const bool is_core_worker) {
225 try {
226 std::thread *_thread = new std::thread([this, is_core_worker]() {
227 bool exit_flag = false;
228 SwooleTG.buffer_stack = new String(SW_STACK_BUFFER_SIZE);
229 ON_SCOPE_EXIT {
230 delete SwooleTG.buffer_stack;
231 SwooleTG.buffer_stack = nullptr;
232 };
233
234 swoole_signal_block_all();
235
236 while (running) {
237 event_mutex.lock();
238 AsyncEvent *event = _queue.pop();
239 event_mutex.unlock();
240
241 swoole_debug("%s: %f", event ? "pop 1 event" : "no event", microtime());
242
243 if (event) {
244 if (sw_unlikely(event->handler == nullptr)) {
245 event->error = SW_ERROR_AIO_BAD_REQUEST;
246 event->retval = -1;
247 } else if (sw_unlikely(event->canceled)) {
248 event->error = SW_ERROR_AIO_CANCELED;
249 event->retval = -1;
250 } else {
251 event->handler(event);
252 }
253
254 swoole_trace_log(SW_TRACE_AIO,
255 "aio_thread %s. ret=%ld, error=%d",
256 event->retval > 0 ? "ok" : "failed",
257 event->retval,
258 event->error);
259
260 _send_event:
261 while (true) {
262 ssize_t ret = event->pipe_socket->write(&event, sizeof(event));
263 if (ret < 0) {
264 if (errno == EAGAIN) {
265 event->pipe_socket->wait_event(1000, SW_EVENT_WRITE);
266 continue;
267 } else if (errno == EINTR) {
268 continue;
269 } else {
270 delete event;
271 swoole_sys_warning("sendto swoole_aio_pipe_write failed");
272 }
273 }
274 break;
275 }
276
277 // exit
278 if (exit_flag) {
279 n_closing--;
280 break;
281 }
282 } else {
283 std::unique_lock<std::mutex> lock(event_mutex);
284 if (_queue.count() > 0) {
285 continue;
286 }
287 if (!running) {
288 break;
289 }
290 ++n_waiting;
291 if (is_core_worker || max_idle_time <= 0) {
292 _cv.wait(lock);
293 } else {
294 while (true) {
295 if (_cv.wait_for(lock, std::chrono::microseconds((size_t)(max_idle_time * 1000 * 1000))) ==
296 std::cv_status::timeout) {
297 if (running && n_closing != 0) {
298 // wait for the next round
299 continue;
300 }
301 /* notifies the main thread to release this thread */
302 event = new AsyncEvent;
303 event->object = new std::thread::id(std::this_thread::get_id());
304 event->callback = release_callback;
305 event->pipe_socket = SwooleG.aio_default_socket;
306 event->canceled = false;
307
308 --n_waiting;
309 ++n_closing;
310 exit_flag = true;
311 goto _send_event;
312 }
313 break;
314 }
315 }
316 --n_waiting;
317 }
318 }
319 });
320 threads[_thread->get_id()] = _thread;
321 } catch (const std::system_error &e) {
322 swoole_sys_notice("create aio thread failed, please check your system configuration or adjust aio_worker_num");
323 return;
324 }
325 }
326
327 AsyncEvent *dispatch(const AsyncEvent *request) {
328 if (sw_unlikely(!SwooleTG.async_threads)) {
329 SwooleTG.async_threads = new AsyncThreads();
330 }
331 AsyncEvent *event = SwooleTG.async_threads->pool->dispatch(request);
332 if (sw_likely(event)) {
333 SwooleTG.async_threads->task_num++;
334 }
335 return event;
336 }
337
338 //-------------------------------------------------------------------------------
339 } // namespace async
340
341 int AsyncThreads::callback(Reactor *reactor, Event *event) {
342 if (SwooleTG.async_threads->schedule) {
343 SwooleTG.async_threads->pool->schedule();
344 }
345
346 AsyncEvent *events[SW_AIO_EVENT_NUM];
347 ssize_t n = event->socket->read(events, sizeof(AsyncEvent *) * SW_AIO_EVENT_NUM);
348 if (n < 0) {
349 swoole_sys_warning("read() aio events failed");
350 return SW_ERR;
351 }
352 for (size_t i = 0; i < n / sizeof(AsyncEvent *); i++) {
353 AsyncEvent *event = events[i];
354 if (!event->canceled) {
355 event->callback(event);
356 }
357 SwooleTG.async_threads->task_num--;
358 delete event;
359 }
360
361 return SW_OK;
362 }
363
364 size_t AsyncThreads::get_worker_num() {
365 return pool ? pool->get_worker_num() : 0;
366 }
367
368 size_t AsyncThreads::get_queue_size() {
369 return pool ? pool->get_queue_size() : 0;
370 }
371
372 void AsyncThreads::notify_one() {
373 if (pool) {
374 pool->notify_one();
375 }
376 }
377
378 AsyncThreads::AsyncThreads() {
379 if (!SwooleTG.reactor) {
380 swoole_warning("no event loop, cannot initialized");
381 throw swoole::Exception(SW_ERROR_WRONG_OPERATION);
382 }
383
384 pipe = new Pipe(false);
385 if (!pipe->ready()) {
386 delete pipe;
387 pipe = nullptr;
388 swoole_throw_error(SW_ERROR_SYSTEM_CALL_FAIL);
389 }
390
391 read_socket = pipe->get_socket(false);
392 write_socket = pipe->get_socket(true);
393 read_socket->fd_type = SW_FD_AIO;
394 write_socket->fd_type = SW_FD_AIO;
395
396 swoole_event_add(read_socket, SW_EVENT_READ);
397
398 sw_reactor()->add_destroy_callback([](void *data) {
399 if (!SwooleTG.async_threads) {
400 return;
401 }
402 swoole_event_del(SwooleTG.async_threads->read_socket);
403 delete SwooleTG.async_threads;
404 SwooleTG.async_threads = nullptr;
405 });
406
407 sw_reactor()->set_exit_condition(Reactor::EXIT_CONDITION_AIO_TASK, [](Reactor *reactor, size_t &event_num) -> bool {
408 if (SwooleTG.async_threads && SwooleTG.async_threads->task_num == 0) {
409 event_num--;
410 }
411 return true;
412 });
413
414 init_lock.lock();
415 pool = new async::ThreadPool(
416 SwooleG.aio_core_worker_num, SwooleG.aio_worker_num, SwooleG.aio_max_wait_time, SwooleG.aio_max_idle_time);
417 pool->start();
418 schedule = true;
419 init_lock.unlock();
420
421 SwooleG.aio_default_socket = write_socket;
422 SwooleTG.async_threads = this;
423 }
424
425 AsyncThreads::~AsyncThreads() {
426 delete pool;
427 pool = nullptr;
428 pipe->close();
429 read_socket = nullptr;
430 write_socket = nullptr;
431 delete pipe;
432 pipe = nullptr;
433 }
434 }; // namespace swoole
435