1 /*
2   +----------------------------------------------------------------------+
3   | Swoole                                                               |
4   +----------------------------------------------------------------------+
arcan_shmifext_defaults(struct arcan_shmif_cont * con)5   | This source file is subject to version 2.0 of the Apache license,    |
6   | that is bundled with this package in the file LICENSE, and is        |
7   | available through the world-wide-web at the following url:           |
8   | http://www.apache.org/licenses/LICENSE-2.0.html                      |
9   | If you did not receive a copy of the Apache2.0 license and are unable|
10   | to obtain it through the world-wide-web, please send a note to       |
11   | license@swoole.com so we can mail you a copy immediately.            |
12   +----------------------------------------------------------------------+
13   | @link     https://www.swoole.com/                                    |
14   | @contact  team@swoole.com                                            |
15   | @license  https://github.com/swoole/swoole-src/blob/master/LICENSE   |
16   | @author   Tianfeng Han  <mikan.tenny@gmail.com>                      |
17   +----------------------------------------------------------------------+
18 */
19 
20 #include "swoole.h"
21 #include "swoole_memory.h"
22 #include "swoole_channel.h"
23 #include "swoole_lock.h"
24 #include "swoole_pipe.h"
25 
26 namespace swoole {
27 
28 #define SW_CHANNEL_MIN_MEM (1024 * 64)
29 
30 struct ChannelSlice {
31     int length;
32     char data[0];
33 };
34 
35 Channel *Channel::make(size_t size, size_t maxlen, int flags) {
36     assert(size >= maxlen);
37     void *mem;
38 
39     // use shared memory
40     if (flags & SW_CHAN_SHM) {
41         /**
42          * overflow space
43          */
44         mem = sw_shm_malloc(size + sizeof(Channel) + maxlen + sizeof(ChannelSlice));
45     } else {
46         mem = sw_malloc(size + sizeof(Channel) + maxlen + sizeof(ChannelSlice));
47     }
48 
49     if (mem == nullptr) {
50         swoole_warning("alloc(%ld) failed", size);
51         return nullptr;
52     }
53 
54     Channel *object = (Channel *) mem;
55     mem = (char *) mem + sizeof(Channel);
56 
57     *object = {};
58 
59     // overflow space
60     object->size = size;
61     object->mem = mem;
62     object->maxlen = maxlen;
63     object->flags = flags;
64 
65     // use lock
66     if (flags & SW_CHAN_LOCK) {
67         // init lock
68         object->lock = new Mutex(Mutex::PROCESS_SHARED);
69     }
70     // use notify
71     if (flags & SW_CHAN_NOTIFY) {
72         object->notify_pipe = new Pipe(true);
73         if (!object->notify_pipe->ready()) {
74             swoole_warning("notify_fd init failed");
75             delete object->notify_pipe;
76             return nullptr;
77         }
78     }
79 
80     return object;
81 }
82 
83 /**
84  * push data(no lock)
85  */
86 int Channel::in(const void *in_data, int data_length) {
87     assert(data_length <= maxlen);
88     if (full()) {
89         return SW_ERR;
90     }
91     ChannelSlice *item;
92     int msize = sizeof(item->length) + data_length;
93 
94     if (tail < head) {
95         // no enough memory space
96         if ((head - tail) < msize) {
97             return SW_ERR;
98         }
99         item = (ChannelSlice *) ((char *) mem + tail);
100         tail += msize;
101     } else {
102         item = (ChannelSlice *) ((char *) mem + tail);
103         tail += msize;
104         if (tail >= (off_t) size) {
105             tail = 0;
106             tail_tag = 1 - tail_tag;
107         }
108     }
109     num++;
110     bytes += data_length;
111     item->length = data_length;
112     memcpy(item->data, in_data, data_length);
113     return SW_OK;
114 }
115 
116 /**
117  * pop data(no lock)
118  */
119 int Channel::out(void *out_buf, int buffer_length) {
120     if (empty()) {
121         return SW_ERR;
122     }
123 
124     ChannelSlice *item = (ChannelSlice *) ((char *) mem + head);
125     assert(buffer_length >= item->length);
126     memcpy(out_buf, item->data, item->length);
127     head += (item->length + sizeof(item->length));
128     if (head >= (off_t) size) {
129         head = 0;
130         head_tag = 1 - head_tag;
131     }
132     num--;
133     bytes -= item->length;
134     return item->length;
135 }
136 
137 /**
138  * peek data
139  */
140 int Channel::peek(void *out, int buffer_length) {
141     if (empty()) {
142         return SW_ERR;
143     }
144 
145     int length;
146     lock->lock();
147     ChannelSlice *item = (ChannelSlice *) ((char *) mem + head);
148     assert(buffer_length >= item->length);
149     memcpy(out, item->data, item->length);
150     length = item->length;
151     lock->unlock();
152 
153     return length;
154 }
155 
156 /**
157  * wait notify
158  */
159 int Channel::wait() {
160     assert(flags & SW_CHAN_NOTIFY);
161     uint64_t value;
162     return notify_pipe->read(&value, sizeof(value));
163 }
164 
165 /**
166  * new data coming, notify to customer
167  */
168 int Channel::notify() {
169     assert(flags & SW_CHAN_NOTIFY);
170     uint64_t value = 1;
171     return notify_pipe->write(&value, sizeof(value));
172 }
173 
174 /**
175  * push data (lock)
176  */
177 int Channel::push(const void *in_data, int data_length) {
178     assert(flags & SW_CHAN_LOCK);
179     lock->lock();
180     int ret = in(in_data, data_length);
181     lock->unlock();
182     return ret;
183 }
184 
185 /**
186  * free channel
187  */
188 void Channel::destroy() {
189     if (flags & SW_CHAN_LOCK) {
190         delete lock;
191     }
192     if (flags & SW_CHAN_NOTIFY) {
193         notify_pipe->close();
194         delete notify_pipe;
195     }
196     if (flags & SW_CHAN_SHM) {
197         sw_shm_free(this);
198     } else {
199         sw_free(this);
200     }
201 }
202 
203 /**
204  * pop data (lock)
205  */
206 int Channel::pop(void *out_buf, int buffer_length) {
207     assert(flags & SW_CHAN_LOCK);
208     lock->lock();
209     int n = out(out_buf, buffer_length);
210     lock->unlock();
211     return n;
212 }
213 
214 void Channel::print() {
215     printf("Channel\n{\n"
216            "    off_t head = %ld;\n"
217            "    off_t tail = %ld;\n"
218            "    size_t size = %ld;\n"
219            "    char head_tag = %d;\n"
220            "    char tail_tag = %d;\n"
221            "    int num = %d;\n"
222            "    size_t bytes = %ld;\n"
223            "    int flag = %d;\n"
224            "    int maxlen = %d;\n"
225            "\n}\n",
226            (long) head,
227            (long) tail,
228            size,
229            tail_tag,
230            head_tag,
231            num,
232            bytes,
233            flags,
234            maxlen);
235 }
236 
237 }  // namespace swoole
238