1 /*
2   +----------------------------------------------------------------------+
3   | Swoole                                                               |
4   +----------------------------------------------------------------------+
5   | This source file is subject to version 2.0 of the Apache license,    |
6   | that is bundled with this package in the file LICENSE, and is        |
7   | available through the world-wide-web at the following url:           |
8   | http://www.apache.org/licenses/LICENSE-2.0.html                      |
9   | If you did not receive a copy of the Apache2.0 license and are unable|
10   | to obtain it through the world-wide-web, please send a note to       |
11   | license@swoole.com so we can mail you a copy immediately.            |
12   +----------------------------------------------------------------------+
13   | Author: Tianfeng Han  <mikan.tenny@gmail.com>                        |
14   |         Twosee  <twose@qq.com>                                       |
15   +----------------------------------------------------------------------+
16 */
17 
18 #pragma once
19 
20 #include "swoole.h"
21 #include "swoole_coroutine.h"
22 
23 #include <sys/stat.h>
24 
25 #include <iostream>
26 #include <string>
27 #include <list>
28 #include <queue>
29 
30 namespace swoole {
31 namespace coroutine {
32 //-------------------------------------------------------------------------------
33 class Channel {
34   public:
35     enum Opcode {
36         PRODUCER = 1,
37         CONSUMER = 2,
38     };
39 
40     enum ErrorCode {
41         ERROR_OK = 0,
42         ERROR_TIMEOUT = -1,
43         ERROR_CLOSED = -2,
44         ERROR_CANCELED = -3,
45     };
46 
47     struct TimeoutMessage {
48         Channel *chan;
49         Opcode type;
50         Coroutine *co;
51         bool error;
52         TimerNode *timer;
53     };
54 
55     void *pop(double timeout = -1);
56     bool push(void *data, double timeout = -1);
57     bool close();
58 
59     Channel(size_t _capacity = 1) : capacity(_capacity) {}
60 
61     ~Channel() {
62         if (!producer_queue.empty()) {
63             swoole_error_log(SW_LOG_WARNING,
64                              SW_ERROR_CO_HAS_BEEN_DISCARDED,
65                              "channel is destroyed, %zu producers will be discarded",
66                              producer_queue.size());
67         }
68         if (!consumer_queue.empty()) {
69             swoole_error_log(SW_LOG_WARNING,
70                              SW_ERROR_CO_HAS_BEEN_DISCARDED,
71                              "channel is destroyed, %zu consumers will be discarded",
72                              consumer_queue.size());
73         }
74     }
75 
76     inline bool is_closed() {
77         return closed;
78     }
79 
80     inline bool is_empty() {
81         return data_queue.size() == 0;
82     }
83 
84     inline bool is_full() {
85         return data_queue.size() == capacity;
86     }
87 
88     inline size_t length() {
89         return data_queue.size();
90     }
91 
92     inline size_t consumer_num() {
93         return consumer_queue.size();
94     }
95 
96     inline size_t producer_num() {
97         return producer_queue.size();
98     }
99 
100     inline void *pop_data() {
101         if (data_queue.size() == 0) {
102             return nullptr;
103         }
104         void *data = data_queue.front();
105         data_queue.pop();
106         return data;
107     }
108 
109     int get_error() {
110         return error_;
111     }
112 
113   protected:
114     size_t capacity = 1;
115     bool closed = false;
116     int error_ = 0;
117     std::list<Coroutine *> producer_queue;
118     std::list<Coroutine *> consumer_queue;
119     std::queue<void *> data_queue;
120 
121     static void timer_callback(Timer *timer, TimerNode *tnode);
122 
123     void yield(enum Opcode type);
124 
125     inline void consumer_remove(Coroutine *co) {
126         consumer_queue.remove(co);
127     }
128 
129     inline void producer_remove(Coroutine *co) {
130         producer_queue.remove(co);
131     }
132 
133     inline Coroutine *pop_coroutine(enum Opcode type) {
134         Coroutine *co;
135         if (type == PRODUCER) {
136             co = producer_queue.front();
137             producer_queue.pop_front();
138             swoole_trace_log(SW_TRACE_CHANNEL, "resume producer cid=%ld", co->get_cid());
139         } else  // if (type == CONSUMER)
140         {
141             co = consumer_queue.front();
142             consumer_queue.pop_front();
143             swoole_trace_log(SW_TRACE_CHANNEL, "resume consumer cid=%ld", co->get_cid());
144         }
145         return co;
146     }
147 };
148 //-------------------------------------------------------------------------------
149 }  // namespace coroutine
150 }  // namespace swoole
151