1 /* 2 +----------------------------------------------------------------------+ 3 | Swoole | 4 +----------------------------------------------------------------------+ 5 | This source file is subject to version 2.0 of the Apache license, | 6 | that is bundled with this package in the file LICENSE, and is | 7 | available through the world-wide-web at the following url: | 8 | http://www.apache.org/licenses/LICENSE-2.0.html | 9 | If you did not receive a copy of the Apache2.0 license and are unable| 10 | to obtain it through the world-wide-web, please send a note to | 11 | license@swoole.com so we can mail you a copy immediately. | 12 +----------------------------------------------------------------------+ 13 | Author: Tianfeng Han <mikan.tenny@gmail.com> | 14 | Twosee <twose@qq.com> | 15 +----------------------------------------------------------------------+ 16 */ 17 18 #pragma once 19 20 #include "swoole_api.h" 21 #include "swoole_string.h" 22 #include "swoole_socket.h" 23 #include "swoole_reactor.h" 24 #include "swoole_timer.h" 25 #include "swoole_async.h" 26 27 #include "swoole_coroutine_context.h" 28 29 #include <limits.h> 30 31 #include <functional> 32 #include <string> 33 #include <unordered_map> 34 35 namespace swoole { 36 class Coroutine { 37 public: 38 constexpr static int STACK_ALIGNED_SIZE = (4 * 1024); 39 constexpr static int MIN_STACK_SIZE = (64 * 1024); 40 constexpr static int MAX_STACK_SIZE = (16 * 1024 * 1024); 41 constexpr static long MAX_NUM_LIMIT = LONG_MAX; 42 43 enum State { 44 STATE_INIT = 0, 45 STATE_WAITING, 46 STATE_RUNNING, 47 STATE_END, 48 }; 49 50 enum Error { 51 ERR_END = 0, 52 ERR_LIMIT = -1, 53 ERR_INVALID = -2, 54 }; 55 56 enum ResumeCode { 57 RC_OK = 0, 58 RC_TIMEDOUT = -1, 59 RC_CANCELED = -2, 60 }; 61 62 typedef void (*SwapCallback)(void *); 63 typedef void (*BailoutCallback)(); 64 typedef std::function<bool(swoole::Coroutine*)> CancelFunc; 65 66 void resume(); 67 void yield(); 68 void yield(CancelFunc *cancel_fn); 69 bool cancel(); 70 71 bool yield_ex(double timeout = -1); 72 get_state()73 inline enum State get_state() const { 74 return state; 75 } 76 get_init_msec()77 inline long get_init_msec() const { 78 return init_msec; 79 } 80 get_cid()81 inline long get_cid() const { 82 return cid; 83 } 84 get_origin()85 inline Coroutine *get_origin() { 86 return origin; 87 } 88 get_origin_cid()89 inline long get_origin_cid() { 90 return sw_likely(origin) ? origin->get_cid() : -1; 91 } 92 get_task()93 inline void *get_task() { 94 return task; 95 } 96 is_end()97 inline bool is_end() { 98 return ctx.is_end(); 99 } 100 is_canceled()101 bool is_canceled() const { 102 return resume_code_ == RC_CANCELED; 103 } 104 is_timedout()105 bool is_timedout() const { 106 return resume_code_ == RC_TIMEDOUT; 107 } 108 is_suspending()109 bool is_suspending() const { 110 return state == STATE_WAITING; 111 } 112 set_task(void * _task)113 inline void set_task(void *_task) { 114 task = _task; 115 } 116 set_cancel_fn(CancelFunc * cancel_fn)117 void set_cancel_fn(CancelFunc *cancel_fn) { 118 cancel_fn_ = cancel_fn; 119 } 120 121 static std::unordered_map<long, Coroutine *> coroutines; 122 123 static void set_on_yield(SwapCallback func); 124 static void set_on_resume(SwapCallback func); 125 static void set_on_close(SwapCallback func); 126 static void bailout(BailoutCallback func); 127 128 static inline long create(const CoroutineFunc &fn, void *args = nullptr) { 129 #ifdef SW_USE_THREAD_CONTEXT 130 try { 131 return (new Coroutine(fn, args))->run(); catch(const std::system_error & e)132 } catch (const std::system_error& e) { 133 swoole_set_last_error(e.code().value()); 134 swoole_warning("failed to create coroutine, Error: %s[%d]", e.what(), swoole_get_last_error()); 135 return -1; 136 } 137 #else 138 return (new Coroutine(fn, args))->run(); 139 #endif 140 } 141 142 static void activate(); 143 static void deactivate(); 144 get_current()145 static inline Coroutine *get_current() { 146 return current; 147 } 148 get_current_safe()149 static inline Coroutine *get_current_safe() { 150 if (sw_unlikely(!current)) { 151 swoole_fatal_error(SW_ERROR_CO_OUT_OF_COROUTINE, "API must be called in the coroutine"); 152 } 153 return current; 154 } 155 get_current_task()156 static inline void *get_current_task() { 157 return sw_likely(current) ? current->get_task() : nullptr; 158 } 159 get_current_cid()160 static inline long get_current_cid() { 161 return sw_likely(current) ? current->get_cid() : -1; 162 } 163 get_by_cid(long cid)164 static inline Coroutine *get_by_cid(long cid) { 165 auto i = coroutines.find(cid); 166 return sw_likely(i != coroutines.end()) ? i->second : nullptr; 167 } 168 get_task_by_cid(long cid)169 static inline void *get_task_by_cid(long cid) { 170 Coroutine *co = get_by_cid(cid); 171 return sw_likely(co) ? co->get_task() : nullptr; 172 } 173 get_stack_size()174 static inline size_t get_stack_size() { 175 return stack_size; 176 } 177 set_stack_size(size_t size)178 static inline void set_stack_size(size_t size) { 179 stack_size = SW_MEM_ALIGNED_SIZE_EX(SW_MAX(MIN_STACK_SIZE, SW_MIN(size, MAX_STACK_SIZE)), STACK_ALIGNED_SIZE); 180 } 181 get_last_cid()182 static inline long get_last_cid() { 183 return last_cid; 184 } 185 count()186 static inline size_t count() { 187 return coroutines.size(); 188 } 189 get_peak_num()190 static inline uint64_t get_peak_num() { 191 return peak_num; 192 } 193 get_elapsed(long cid)194 static inline long get_elapsed(long cid) { 195 Coroutine *co = cid == 0 ? get_current() : get_by_cid(cid); 196 return sw_likely(co) ? Timer::get_absolute_msec() - co->get_init_msec() : -1; 197 } 198 199 static void print_list(); 200 201 protected: 202 static Coroutine *current; 203 static long last_cid; 204 static uint64_t peak_num; 205 static size_t stack_size; 206 static SwapCallback on_yield; /* before yield */ 207 static SwapCallback on_resume; /* before resume */ 208 static SwapCallback on_close; /* before close */ 209 static BailoutCallback on_bailout; /* when bailout */ 210 static bool activated; 211 212 enum State state = STATE_INIT; 213 enum ResumeCode resume_code_ = RC_OK; 214 long cid; 215 long init_msec = Timer::get_absolute_msec(); 216 void *task = nullptr; 217 coroutine::Context ctx; 218 Coroutine *origin = nullptr; 219 CancelFunc *cancel_fn_ = nullptr; 220 Coroutine(const CoroutineFunc & fn,void * private_data)221 Coroutine(const CoroutineFunc &fn, void *private_data) : ctx(stack_size, fn, private_data) { 222 cid = ++last_cid; 223 coroutines[cid] = this; 224 if (sw_unlikely(count() > peak_num)) { 225 peak_num = count(); 226 } 227 } 228 run()229 inline long run() { 230 long cid = this->cid; 231 origin = current; 232 current = this; 233 ctx.swap_in(); 234 check_end(); 235 return cid; 236 } 237 check_end()238 inline void check_end() { 239 if (ctx.is_end()) { 240 close(); 241 } else if (sw_unlikely(on_bailout)) { 242 SW_ASSERT(current == nullptr); 243 on_bailout(); 244 // expect that never here 245 exit(1); 246 } 247 } 248 249 void close(); 250 }; 251 //------------------------------------------------------------------------------- 252 namespace coroutine { 253 bool async(async::Handler handler, AsyncEvent &event, double timeout = -1); 254 bool async(const std::function<void(void)> &fn, double timeout = -1); 255 bool run(const CoroutineFunc &fn, void *arg = nullptr); 256 } // namespace coroutine 257 //------------------------------------------------------------------------------- 258 } // namespace swoole 259 260 /** 261 * for gdb 262 */ 263 swoole::Coroutine *swoole_coroutine_iterator_each(); 264 void swoole_coroutine_iterator_reset(); 265 swoole::Coroutine *swoole_coroutine_get(long cid); 266 size_t swoole_coroutine_count(); 267