1 /* 2 +----------------------------------------------------------------------+ 3 | Swoole | 4 +----------------------------------------------------------------------+ arcan_shmifext_defaults(struct arcan_shmif_cont * con)5 | This source file is subject to version 2.0 of the Apache license, | 6 | that is bundled with this package in the file LICENSE, and is | 7 | available through the world-wide-web at the following url: | 8 | http://www.apache.org/licenses/LICENSE-2.0.html | 9 | If you did not receive a copy of the Apache2.0 license and are unable| 10 | to obtain it through the world-wide-web, please send a note to | 11 | license@swoole.com so we can mail you a copy immediately. | 12 +----------------------------------------------------------------------+ 13 | @link https://www.swoole.com/ | 14 | @contact team@swoole.com | 15 | @license https://github.com/swoole/swoole-src/blob/master/LICENSE | 16 | @author Tianfeng Han <mikan.tenny@gmail.com> | 17 +----------------------------------------------------------------------+ 18 */ 19 20 #include "swoole.h" 21 #include "swoole_memory.h" 22 #include "swoole_channel.h" 23 #include "swoole_lock.h" 24 #include "swoole_pipe.h" 25 26 namespace swoole { 27 28 #define SW_CHANNEL_MIN_MEM (1024 * 64) 29 30 struct ChannelSlice { 31 int length; 32 char data[0]; 33 }; 34 35 Channel *Channel::make(size_t size, size_t maxlen, int flags) { 36 assert(size >= maxlen); 37 void *mem; 38 39 // use shared memory 40 if (flags & SW_CHAN_SHM) { 41 /** 42 * overflow space 43 */ 44 mem = sw_shm_malloc(size + sizeof(Channel) + maxlen + sizeof(ChannelSlice)); 45 } else { 46 mem = sw_malloc(size + sizeof(Channel) + maxlen + sizeof(ChannelSlice)); 47 } 48 49 if (mem == nullptr) { 50 swoole_warning("alloc(%ld) failed", size); 51 return nullptr; 52 } 53 54 Channel *object = (Channel *) mem; 55 mem = (char *) mem + sizeof(Channel); 56 57 *object = {}; 58 59 // overflow space 60 object->size = size; 61 object->mem = mem; 62 object->maxlen = maxlen; 63 object->flags = flags; 64 65 // use lock 66 if (flags & SW_CHAN_LOCK) { 67 // init lock 68 object->lock = new Mutex(Mutex::PROCESS_SHARED); 69 } 70 // use notify 71 if (flags & SW_CHAN_NOTIFY) { 72 object->notify_pipe = new Pipe(true); 73 if (!object->notify_pipe->ready()) { 74 swoole_warning("notify_fd init failed"); 75 delete object->notify_pipe; 76 return nullptr; 77 } 78 } 79 80 return object; 81 } 82 83 /** 84 * push data(no lock) 85 */ 86 int Channel::in(const void *in_data, int data_length) { 87 assert(data_length <= maxlen); 88 if (full()) { 89 return SW_ERR; 90 } 91 ChannelSlice *item; 92 int msize = sizeof(item->length) + data_length; 93 94 if (tail < head) { 95 // no enough memory space 96 if ((head - tail) < msize) { 97 return SW_ERR; 98 } 99 item = (ChannelSlice *) ((char *) mem + tail); 100 tail += msize; 101 } else { 102 item = (ChannelSlice *) ((char *) mem + tail); 103 tail += msize; 104 if (tail >= (off_t) size) { 105 tail = 0; 106 tail_tag = 1 - tail_tag; 107 } 108 } 109 num++; 110 bytes += data_length; 111 item->length = data_length; 112 memcpy(item->data, in_data, data_length); 113 return SW_OK; 114 } 115 116 /** 117 * pop data(no lock) 118 */ 119 int Channel::out(void *out_buf, int buffer_length) { 120 if (empty()) { 121 return SW_ERR; 122 } 123 124 ChannelSlice *item = (ChannelSlice *) ((char *) mem + head); 125 assert(buffer_length >= item->length); 126 memcpy(out_buf, item->data, item->length); 127 head += (item->length + sizeof(item->length)); 128 if (head >= (off_t) size) { 129 head = 0; 130 head_tag = 1 - head_tag; 131 } 132 num--; 133 bytes -= item->length; 134 return item->length; 135 } 136 137 /** 138 * peek data 139 */ 140 int Channel::peek(void *out, int buffer_length) { 141 if (empty()) { 142 return SW_ERR; 143 } 144 145 int length; 146 lock->lock(); 147 ChannelSlice *item = (ChannelSlice *) ((char *) mem + head); 148 assert(buffer_length >= item->length); 149 memcpy(out, item->data, item->length); 150 length = item->length; 151 lock->unlock(); 152 153 return length; 154 } 155 156 /** 157 * wait notify 158 */ 159 int Channel::wait() { 160 assert(flags & SW_CHAN_NOTIFY); 161 uint64_t value; 162 return notify_pipe->read(&value, sizeof(value)); 163 } 164 165 /** 166 * new data coming, notify to customer 167 */ 168 int Channel::notify() { 169 assert(flags & SW_CHAN_NOTIFY); 170 uint64_t value = 1; 171 return notify_pipe->write(&value, sizeof(value)); 172 } 173 174 /** 175 * push data (lock) 176 */ 177 int Channel::push(const void *in_data, int data_length) { 178 assert(flags & SW_CHAN_LOCK); 179 lock->lock(); 180 int ret = in(in_data, data_length); 181 lock->unlock(); 182 return ret; 183 } 184 185 /** 186 * free channel 187 */ 188 void Channel::destroy() { 189 if (flags & SW_CHAN_LOCK) { 190 delete lock; 191 } 192 if (flags & SW_CHAN_NOTIFY) { 193 notify_pipe->close(); 194 delete notify_pipe; 195 } 196 if (flags & SW_CHAN_SHM) { 197 sw_shm_free(this); 198 } else { 199 sw_free(this); 200 } 201 } 202 203 /** 204 * pop data (lock) 205 */ 206 int Channel::pop(void *out_buf, int buffer_length) { 207 assert(flags & SW_CHAN_LOCK); 208 lock->lock(); 209 int n = out(out_buf, buffer_length); 210 lock->unlock(); 211 return n; 212 } 213 214 void Channel::print() { 215 printf("Channel\n{\n" 216 " off_t head = %ld;\n" 217 " off_t tail = %ld;\n" 218 " size_t size = %ld;\n" 219 " char head_tag = %d;\n" 220 " char tail_tag = %d;\n" 221 " int num = %d;\n" 222 " size_t bytes = %ld;\n" 223 " int flag = %d;\n" 224 " int maxlen = %d;\n" 225 "\n}\n", 226 (long) head, 227 (long) tail, 228 size, 229 tail_tag, 230 head_tag, 231 num, 232 bytes, 233 flags, 234 maxlen); 235 } 236 237 } // namespace swoole 238