1 /*
2   +----------------------------------------------------------------------+
3   | Swoole                                                               |
4   +----------------------------------------------------------------------+
5   | This source file is subject to version 2.0 of the Apache license,    |
6   | that is bundled with this package in the file LICENSE, and is        |
7   | available through the world-wide-web at the following url:           |
8   | http://www.apache.org/licenses/LICENSE-2.0.html                      |
9   | If you did not receive a copy of the Apache2.0 license and are unable|
10   | to obtain it through the world-wide-web, please send a note to       |
11   | license@swoole.com so we can mail you a copy immediately.            |
12   +----------------------------------------------------------------------+
13   | Author: Tianfeng Han  <mikan.tenny@gmail.com>                        |
14   |         Twosee  <twose@qq.com>                                       |
15   +----------------------------------------------------------------------+
16 */
17 
18 #pragma once
19 
20 #include "swoole_api.h"
21 #include "swoole_string.h"
22 #include "swoole_socket.h"
23 #include "swoole_reactor.h"
24 #include "swoole_timer.h"
25 #include "swoole_async.h"
26 
27 #include "swoole_coroutine_context.h"
28 
29 #include <limits.h>
30 
31 #include <functional>
32 #include <string>
33 #include <unordered_map>
34 
35 namespace swoole {
36 class Coroutine {
37   public:
38     constexpr static int STACK_ALIGNED_SIZE = (4 * 1024);
39     constexpr static int MIN_STACK_SIZE = (64 * 1024);
40     constexpr static int MAX_STACK_SIZE = (16 * 1024 * 1024);
41     constexpr static long MAX_NUM_LIMIT = LONG_MAX;
42 
43     enum State {
44         STATE_INIT = 0,
45         STATE_WAITING,
46         STATE_RUNNING,
47         STATE_END,
48     };
49 
50     enum Error {
51         ERR_END = 0,
52         ERR_LIMIT = -1,
53         ERR_INVALID = -2,
54     };
55 
56     enum ResumeCode {
57         RC_OK = 0,
58         RC_TIMEDOUT = -1,
59         RC_CANCELED = -2,
60     };
61 
62     typedef void (*SwapCallback)(void *);
63     typedef void (*BailoutCallback)();
64     typedef std::function<bool(swoole::Coroutine*)> CancelFunc;
65 
66     void resume();
67     void yield();
68     void yield(CancelFunc *cancel_fn);
69     bool cancel();
70 
71     bool yield_ex(double timeout = -1);
72 
get_state()73     inline enum State get_state() const {
74         return state;
75     }
76 
get_init_msec()77     inline long get_init_msec() const  {
78         return init_msec;
79     }
80 
get_cid()81     inline long get_cid() const {
82         return cid;
83     }
84 
get_origin()85     inline Coroutine *get_origin() {
86         return origin;
87     }
88 
get_origin_cid()89     inline long get_origin_cid() {
90         return sw_likely(origin) ? origin->get_cid() : -1;
91     }
92 
get_task()93     inline void *get_task() {
94         return task;
95     }
96 
is_end()97     inline bool is_end() {
98         return ctx.is_end();
99     }
100 
is_canceled()101     bool is_canceled() const {
102         return resume_code_ == RC_CANCELED;
103     }
104 
is_timedout()105     bool is_timedout() const {
106         return resume_code_ == RC_TIMEDOUT;
107     }
108 
is_suspending()109     bool is_suspending() const {
110         return state == STATE_WAITING;
111     }
112 
set_task(void * _task)113     inline void set_task(void *_task) {
114         task = _task;
115     }
116 
set_cancel_fn(CancelFunc * cancel_fn)117     void set_cancel_fn(CancelFunc *cancel_fn) {
118         cancel_fn_ = cancel_fn;
119     }
120 
121     static std::unordered_map<long, Coroutine *> coroutines;
122 
123     static void set_on_yield(SwapCallback func);
124     static void set_on_resume(SwapCallback func);
125     static void set_on_close(SwapCallback func);
126     static void bailout(BailoutCallback func);
127 
128     static inline long create(const CoroutineFunc &fn, void *args = nullptr) {
129 #ifdef SW_USE_THREAD_CONTEXT
130         try {
131             return (new Coroutine(fn, args))->run();
catch(const std::system_error & e)132         } catch (const std::system_error& e) {
133             swoole_set_last_error(e.code().value());
134             swoole_warning("failed to create coroutine, Error: %s[%d]", e.what(), swoole_get_last_error());
135             return -1;
136         }
137 #else
138         return (new Coroutine(fn, args))->run();
139 #endif
140     }
141 
142     static void activate();
143     static void deactivate();
144 
get_current()145     static inline Coroutine *get_current() {
146         return current;
147     }
148 
get_current_safe()149     static inline Coroutine *get_current_safe() {
150         if (sw_unlikely(!current)) {
151             swoole_fatal_error(SW_ERROR_CO_OUT_OF_COROUTINE, "API must be called in the coroutine");
152         }
153         return current;
154     }
155 
get_current_task()156     static inline void *get_current_task() {
157         return sw_likely(current) ? current->get_task() : nullptr;
158     }
159 
get_current_cid()160     static inline long get_current_cid() {
161         return sw_likely(current) ? current->get_cid() : -1;
162     }
163 
get_by_cid(long cid)164     static inline Coroutine *get_by_cid(long cid) {
165         auto i = coroutines.find(cid);
166         return sw_likely(i != coroutines.end()) ? i->second : nullptr;
167     }
168 
get_task_by_cid(long cid)169     static inline void *get_task_by_cid(long cid) {
170         Coroutine *co = get_by_cid(cid);
171         return sw_likely(co) ? co->get_task() : nullptr;
172     }
173 
get_stack_size()174     static inline size_t get_stack_size() {
175         return stack_size;
176     }
177 
set_stack_size(size_t size)178     static inline void set_stack_size(size_t size) {
179         stack_size = SW_MEM_ALIGNED_SIZE_EX(SW_MAX(MIN_STACK_SIZE, SW_MIN(size, MAX_STACK_SIZE)), STACK_ALIGNED_SIZE);
180     }
181 
get_last_cid()182     static inline long get_last_cid() {
183         return last_cid;
184     }
185 
count()186     static inline size_t count() {
187         return coroutines.size();
188     }
189 
get_peak_num()190     static inline uint64_t get_peak_num() {
191         return peak_num;
192     }
193 
get_elapsed(long cid)194     static inline long get_elapsed(long cid) {
195         Coroutine *co = cid == 0 ? get_current() : get_by_cid(cid);
196         return sw_likely(co) ? Timer::get_absolute_msec() - co->get_init_msec() : -1;
197     }
198 
199     static void print_list();
200 
201   protected:
202     static Coroutine *current;
203     static long last_cid;
204     static uint64_t peak_num;
205     static size_t stack_size;
206     static SwapCallback on_yield;   /* before yield */
207     static SwapCallback on_resume;  /* before resume */
208     static SwapCallback on_close;   /* before close */
209     static BailoutCallback on_bailout; /* when bailout */
210     static bool activated;
211 
212     enum State state = STATE_INIT;
213     enum ResumeCode resume_code_ = RC_OK;
214     long cid;
215     long init_msec = Timer::get_absolute_msec();
216     void *task = nullptr;
217     coroutine::Context ctx;
218     Coroutine *origin = nullptr;
219     CancelFunc *cancel_fn_ = nullptr;
220 
Coroutine(const CoroutineFunc & fn,void * private_data)221     Coroutine(const CoroutineFunc &fn, void *private_data) : ctx(stack_size, fn, private_data) {
222         cid = ++last_cid;
223         coroutines[cid] = this;
224         if (sw_unlikely(count() > peak_num)) {
225             peak_num = count();
226         }
227     }
228 
run()229     inline long run() {
230         long cid = this->cid;
231         origin = current;
232         current = this;
233         ctx.swap_in();
234         check_end();
235         return cid;
236     }
237 
check_end()238     inline void check_end() {
239         if (ctx.is_end()) {
240             close();
241         } else if (sw_unlikely(on_bailout)) {
242             SW_ASSERT(current == nullptr);
243             on_bailout();
244             // expect that never here
245             exit(1);
246         }
247     }
248 
249     void close();
250 };
251 //-------------------------------------------------------------------------------
252 namespace coroutine {
253 bool async(async::Handler handler, AsyncEvent &event, double timeout = -1);
254 bool async(const std::function<void(void)> &fn, double timeout = -1);
255 bool run(const CoroutineFunc &fn, void *arg = nullptr);
256 }  // namespace coroutine
257 //-------------------------------------------------------------------------------
258 }  // namespace swoole
259 
260 /**
261  * for gdb
262  */
263 swoole::Coroutine *swoole_coroutine_iterator_each();
264 void swoole_coroutine_iterator_reset();
265 swoole::Coroutine *swoole_coroutine_get(long cid);
266 size_t swoole_coroutine_count();
267