1 /*
2   +----------------------------------------------------------------------+
3   | Swoole                                                               |
4   +----------------------------------------------------------------------+
5   | This source file is subject to version 2.0 of the Apache license,    |
6   | that is bundled with this package in the file LICENSE, and is        |
7   | available through the world-wide-web at the following url:           |
8   | http://www.apache.org/licenses/LICENSE-2.0.html                      |
9   | If you did not receive a copy of the Apache2.0 license and are unable|
10   | to obtain it through the world-wide-web, please send a note to       |
11   | license@swoole.com so we can mail you a copy immediately.            |
12   +----------------------------------------------------------------------+
13   | @link     https://www.swoole.com/                                    |
14   | @contact  team@swoole.com                                            |
15   | @license  https://github.com/swoole/swoole-src/blob/master/LICENSE   |
16   | @author   Tianfeng Han  <mikan.tenny@gmail.com>                      |
17   +----------------------------------------------------------------------+
18 */
19 
20 #include "test_core.h"
21 #include "swoole_channel.h"
22 
23 using namespace std;
24 using namespace swoole;
25 
26 const int N = 10000000;
27 
TEST(channel,push)28 TEST(channel, push) {
29     auto *c = Channel::make(128 * 1024, 8192, SW_CHAN_LOCK | SW_CHAN_NOTIFY);
30     map<int, string> m;
31 
32     size_t bytes = 0;
33     int index = 0;
34 
35     while (bytes < N) {
36         char buf[8000];
37         int n = swoole_random_bytes(buf, (rand() % (sizeof(buf) / 2)) + (sizeof(buf) / 2));
38         if (n <= 0) {
39             swoole_trace("no enough data, n=%d, errno=%d\n", n, errno);
40             continue;
41         }
42         m[index++] = string(buf, n);
43         bytes += n;
44     }
45 
46     swoole_trace("size=%lu", m.size());
47 
48     thread t1([&]() {
49         auto next = m.find(0);
50         int index = 1;
51         size_t bytes = 0;
52 
53         while (bytes < N) {
54             if (c->push(next->second.c_str(), next->second.length()) == SW_OK) {
55                 swoole_trace("[PUSH] index=%d, size=%lu", index, next->second.length());
56                 bytes += next->second.length();
57                 next = m.find(index++);
58                 if (next == m.end()) {
59                     break;
60                 }
61             } else {
62                 usleep(10);
63             }
64         }
65     });
66 
67     thread t2([&]() {
68         char buf[8000];
69         size_t bytes = 0;
70         int index = 0;
71         while (bytes < N) {
72             int retval = c->pop(buf, sizeof(buf));
73             if (retval > 0) {
74                 swoole_trace("[POP] index=%d, size=%d", index, retval);
75                 string &_data = m[index++];
76                 bytes += retval;
77                 ASSERT_EQ(_data, string(buf, retval));
78             } else {
79                 usleep(10);
80             }
81         }
82     });
83 
84     t1.join();
85     t2.join();
86 
87     c->destroy();
88 }
89