1 /* 2 +----------------------------------------------------------------------+ 3 | Swoole | 4 +----------------------------------------------------------------------+ 5 | This source file is subject to version 2.0 of the Apache license, | 6 | that is bundled with this package in the file LICENSE, and is | 7 | available through the world-wide-web at the following url: | 8 | http://www.apache.org/licenses/LICENSE-2.0.html | 9 | If you did not receive a copy of the Apache2.0 license and are unable| 10 | to obtain it through the world-wide-web, please send a note to | 11 | license@swoole.com so we can mail you a copy immediately. | 12 +----------------------------------------------------------------------+ 13 | Author: Tianfeng Han <mikan.tenny@gmail.com> | 14 | Twosee <twose@qq.com> | 15 +----------------------------------------------------------------------+ 16 */ 17 18 #pragma once 19 20 #include "swoole.h" 21 #include "swoole_coroutine.h" 22 23 #include <sys/stat.h> 24 25 #include <iostream> 26 #include <string> 27 #include <list> 28 #include <queue> 29 30 namespace swoole { 31 namespace coroutine { 32 //------------------------------------------------------------------------------- 33 class Channel { 34 public: 35 enum Opcode { 36 PRODUCER = 1, 37 CONSUMER = 2, 38 }; 39 40 enum ErrorCode { 41 ERROR_OK = 0, 42 ERROR_TIMEOUT = -1, 43 ERROR_CLOSED = -2, 44 ERROR_CANCELED = -3, 45 }; 46 47 struct TimeoutMessage { 48 Channel *chan; 49 Opcode type; 50 Coroutine *co; 51 bool error; 52 TimerNode *timer; 53 }; 54 55 void *pop(double timeout = -1); 56 bool push(void *data, double timeout = -1); 57 bool close(); 58 59 Channel(size_t _capacity = 1) : capacity(_capacity) {} 60 61 ~Channel() { 62 if (!producer_queue.empty()) { 63 swoole_error_log(SW_LOG_WARNING, 64 SW_ERROR_CO_HAS_BEEN_DISCARDED, 65 "channel is destroyed, %zu producers will be discarded", 66 producer_queue.size()); 67 } 68 if (!consumer_queue.empty()) { 69 swoole_error_log(SW_LOG_WARNING, 70 SW_ERROR_CO_HAS_BEEN_DISCARDED, 71 "channel is destroyed, %zu consumers will be discarded", 72 consumer_queue.size()); 73 } 74 } 75 76 inline bool is_closed() { 77 return closed; 78 } 79 80 inline bool is_empty() { 81 return data_queue.size() == 0; 82 } 83 84 inline bool is_full() { 85 return data_queue.size() == capacity; 86 } 87 88 inline size_t length() { 89 return data_queue.size(); 90 } 91 92 inline size_t consumer_num() { 93 return consumer_queue.size(); 94 } 95 96 inline size_t producer_num() { 97 return producer_queue.size(); 98 } 99 100 inline void *pop_data() { 101 if (data_queue.size() == 0) { 102 return nullptr; 103 } 104 void *data = data_queue.front(); 105 data_queue.pop(); 106 return data; 107 } 108 109 int get_error() { 110 return error_; 111 } 112 113 protected: 114 size_t capacity = 1; 115 bool closed = false; 116 int error_ = 0; 117 std::list<Coroutine *> producer_queue; 118 std::list<Coroutine *> consumer_queue; 119 std::queue<void *> data_queue; 120 121 static void timer_callback(Timer *timer, TimerNode *tnode); 122 123 void yield(enum Opcode type); 124 125 inline void consumer_remove(Coroutine *co) { 126 consumer_queue.remove(co); 127 } 128 129 inline void producer_remove(Coroutine *co) { 130 producer_queue.remove(co); 131 } 132 133 inline Coroutine *pop_coroutine(enum Opcode type) { 134 Coroutine *co; 135 if (type == PRODUCER) { 136 co = producer_queue.front(); 137 producer_queue.pop_front(); 138 swoole_trace_log(SW_TRACE_CHANNEL, "resume producer cid=%ld", co->get_cid()); 139 } else // if (type == CONSUMER) 140 { 141 co = consumer_queue.front(); 142 consumer_queue.pop_front(); 143 swoole_trace_log(SW_TRACE_CHANNEL, "resume consumer cid=%ld", co->get_cid()); 144 } 145 return co; 146 } 147 }; 148 //------------------------------------------------------------------------------- 149 } // namespace coroutine 150 } // namespace swoole 151