1 /*
2 +----------------------------------------------------------------------+
3 | Swoole |
4 +----------------------------------------------------------------------+
5 | This source file is subject to version 2.0 of the Apache license, |
6 | that is bundled with this package in the file LICENSE, and is |
7 | available through the world-wide-web at the following url: |
8 | http://www.apache.org/licenses/LICENSE-2.0.html |
9 | If you did not receive a copy of the Apache2.0 license and are unable|
10 | to obtain it through the world-wide-web, please send a note to |
11 | license@swoole.com so we can mail you a copy immediately. |
12 +----------------------------------------------------------------------+
13 | @link https://www.swoole.com/ |
14 | @contact team@swoole.com |
15 | @license https://github.com/swoole/swoole-src/blob/master/LICENSE |
16 | @author Tianfeng Han <mikan.tenny@gmail.com> |
17 +----------------------------------------------------------------------+
18 */
19
20 #include "test_core.h"
21 #include "swoole_channel.h"
22
23 using namespace std;
24 using namespace swoole;
25
26 const int N = 10000000;
27
TEST(channel,push)28 TEST(channel, push) {
29 auto *c = Channel::make(128 * 1024, 8192, SW_CHAN_LOCK | SW_CHAN_NOTIFY);
30 map<int, string> m;
31
32 size_t bytes = 0;
33 int index = 0;
34
35 while (bytes < N) {
36 char buf[8000];
37 int n = swoole_random_bytes(buf, (rand() % (sizeof(buf) / 2)) + (sizeof(buf) / 2));
38 if (n <= 0) {
39 swoole_trace("no enough data, n=%d, errno=%d\n", n, errno);
40 continue;
41 }
42 m[index++] = string(buf, n);
43 bytes += n;
44 }
45
46 swoole_trace("size=%lu", m.size());
47
48 thread t1([&]() {
49 auto next = m.find(0);
50 int index = 1;
51 size_t bytes = 0;
52
53 while (bytes < N) {
54 if (c->push(next->second.c_str(), next->second.length()) == SW_OK) {
55 swoole_trace("[PUSH] index=%d, size=%lu", index, next->second.length());
56 bytes += next->second.length();
57 next = m.find(index++);
58 if (next == m.end()) {
59 break;
60 }
61 } else {
62 usleep(10);
63 }
64 }
65 });
66
67 thread t2([&]() {
68 char buf[8000];
69 size_t bytes = 0;
70 int index = 0;
71 while (bytes < N) {
72 int retval = c->pop(buf, sizeof(buf));
73 if (retval > 0) {
74 swoole_trace("[POP] index=%d, size=%d", index, retval);
75 string &_data = m[index++];
76 bytes += retval;
77 ASSERT_EQ(_data, string(buf, retval));
78 } else {
79 usleep(10);
80 }
81 }
82 });
83
84 t1.join();
85 t2.join();
86
87 c->destroy();
88 }
89