1 /*
2 +----------------------------------------------------------------------+
3 | Swoole |
4 +----------------------------------------------------------------------+
5 | This source file is subject to version 2.0 of the Apache license, |
6 | that is bundled with this package in the file LICENSE, and is |
7 | available through the world-wide-web at the following url: |
8 | http://www.apache.org/licenses/LICENSE-2.0.html |
9 | If you did not receive a copy of the Apache2.0 license and are unable|
10 | to obtain it through the world-wide-web, please send a note to |
11 | license@swoole.com so we can mail you a copy immediately. |
12 +----------------------------------------------------------------------+
13 | @link https://www.swoole.com/ |
14 | @contact team@swoole.com |
15 | @license https://github.com/swoole/swoole-src/blob/master/LICENSE |
16 | @author Tianfeng Han <mikan.tenny@gmail.com> |
17 +----------------------------------------------------------------------+
18 */
19
20 #include "test_core.h"
21
22 #include "swoole_server.h"
23 #include "swoole_memory.h"
24 #include "swoole_signal.h"
25 #include "swoole_lock.h"
26
27 using namespace std;
28 using namespace swoole;
29
30 constexpr int DATA_SIZE = 2 * SW_NUM_MILLION;
31
32 struct TestPacket {
33 SessionId fd;
34 std::string data;
35 };
36
37 struct TestMB {
38 std::vector<TestPacket> q;
39 MessageBus mb;
40 std::function<ssize_t(network::Socket *)> read_func;
41
send_empty_packetTestMB42 bool send_empty_packet(network::Socket *sock) {
43 SendData _data4;
44 _data4.data = "hello world";
45 _data4.info.fd = 4;
46 _data4.info.len = 0;
47 if (!mb.write(sock, &_data4)) {
48 return false;
49 }
50
51 SendData _data5;
52 _data5.data = nullptr;
53 _data5.info.fd = 5;
54 _data5.info.len = 10;
55 if (!mb.write(sock, &_data5)) {
56 return false;
57 }
58
59 return true;
60 }
61
readTestMB62 int read(Event *ev) {
63 auto retval = read_func(ev->socket);
64 if (retval == 0) {
65 return SW_OK;
66 } else if (retval < 0) {
67 swoole_event_del(ev->socket);
68 return SW_ERR;
69 }
70
71 auto packet = mb.get_packet();
72
73 q.push_back(TestPacket{
74 mb.get_buffer()->info.fd,
75 std::string(packet.data, packet.length),
76 });
77
78 if (q.size() == 5) {
79 swoole_event_del(ev->socket);
80 }
81
82 return SW_OK;
83 }
84 };
85
86 #define MB_SEND(i, s) \
87 String pkt##i(s); \
88 pkt##i.append_random_bytes(pkt##i.size - 1, false); \
89 pkt##i.append('\0'); \
90 \
91 SendData _data##i{}; \
92 _data##i.data = pkt##i.value(); \
93 _data##i.info.fd = i; \
94 _data##i.info.len = pkt##i.get_length(); \
95 ASSERT_TRUE(tmb.mb.write(p.get_socket(true), &_data##i));
96
97 #define MB_ASSERT(i) \
98 auto r##i = tmb.q.at(i - 1); \
99 ASSERT_EQ(r##i.fd, i); \
100 ASSERT_STREQ(r##i.data.c_str(), pkt##i.value());
101
TEST(message_bus,read)102 TEST(message_bus, read) {
103 UnixSocket p(true, SOCK_STREAM);
104 ASSERT_TRUE(p.ready());
105
106 ASSERT_EQ(swoole_event_init(SW_EVENTLOOP_WAIT_EXIT), SW_OK);
107 p.set_blocking(false);
108 p.set_buffer_size(65536);
109
110 uint64_t msg_id = 0;
111
112 TestMB tmb{};
113 tmb.mb.set_buffer_size(65536);
114 tmb.mb.set_id_generator([&msg_id]() { return msg_id++; });
115 tmb.mb.alloc_buffer();
116
117 tmb.read_func = [&tmb](network::Socket *sock) {
118 return tmb.mb.read(sock);
119 };
120
121 sw_reactor()->ptr = &tmb;
122
123 ASSERT_EQ(swoole_event_add(p.get_socket(false), SW_EVENT_READ), SW_OK);
124
125 swoole_event_set_handler(SW_FD_PIPE | SW_EVENT_READ, [](Reactor *reactor, Event *ev) -> int {
126 TestMB *tmb = (TestMB *) reactor->ptr;
127 return tmb->read(ev);
128 });
129
130 MB_SEND(1, DATA_SIZE);
131 MB_SEND(2, tmb.mb.get_buffer_size());
132 MB_SEND(3, 2341);
133
134 tmb.send_empty_packet(p.get_socket(true));
135
136 ASSERT_EQ(swoole_event_wait(), SW_OK);
137
138 MB_ASSERT(1);
139 MB_ASSERT(2);
140 MB_ASSERT(3);
141
142 auto r4 = tmb.q.at(3);
143 ASSERT_EQ(r4.fd, 4);
144 ASSERT_STREQ(r4.data.c_str(), "");
145
146 auto r5 = tmb.q.at(4);
147 ASSERT_EQ(r5.fd, 5);
148 ASSERT_STREQ(r5.data.c_str(), "");
149 }
150
TEST(message_bus,read_with_buffer)151 TEST(message_bus, read_with_buffer) {
152 UnixSocket p(true, SOCK_DGRAM);
153 ASSERT_TRUE(p.ready());
154
155 ASSERT_EQ(swoole_event_init(SW_EVENTLOOP_WAIT_EXIT), SW_OK);
156 p.set_blocking(false);
157 p.set_buffer_size(65536);
158
159 uint64_t msg_id = 0;
160
161 TestMB tmb{};
162 tmb.mb.set_buffer_size(65536);
163 tmb.mb.set_id_generator([&msg_id]() { return msg_id++; });
164 tmb.mb.alloc_buffer();
165
166 tmb.read_func = [&tmb](network::Socket *sock) {
167 return tmb.mb.read_with_buffer(sock);
168 };
169
170 sw_reactor()->ptr = &tmb;
171
172 ASSERT_EQ(swoole_event_add(p.get_socket(false), SW_EVENT_READ), SW_OK);
173
174 swoole_event_set_handler(SW_FD_PIPE | SW_EVENT_READ, [](Reactor *reactor, Event *ev) -> int {
175 TestMB *tmb = (TestMB *) reactor->ptr;
176 return tmb->read(ev);
177 });
178
179 MB_SEND(1, DATA_SIZE);
180 MB_SEND(2, tmb.mb.get_buffer_size());
181 MB_SEND(3, 2341);
182
183 tmb.send_empty_packet(p.get_socket(true));
184
185 ASSERT_EQ(swoole_event_wait(), SW_OK);
186
187 MB_ASSERT(1);
188 MB_ASSERT(2);
189 MB_ASSERT(3);
190
191 auto r4 = tmb.q.at(3);
192 ASSERT_EQ(r4.fd, 4);
193 ASSERT_STREQ(r4.data.c_str(), "");
194
195 auto r5 = tmb.q.at(4);
196 ASSERT_EQ(r5.fd, 5);
197 ASSERT_STREQ(r5.data.c_str(), "");
198 }
199