1 /*
2   +----------------------------------------------------------------------+
3   | Swoole                                                               |
4   +----------------------------------------------------------------------+
5   | This source file is subject to version 2.0 of the Apache license,    |
6   | that is bundled with this package in the file LICENSE, and is        |
7   | available through the world-wide-web at the following url:           |
8   | http://www.apache.org/licenses/LICENSE-2.0.html                      |
9   | If you did not receive a copy of the Apache2.0 license and are unable|
10   | to obtain it through the world-wide-web, please send a note to       |
11   | license@swoole.com so we can mail you a copy immediately.            |
12   +----------------------------------------------------------------------+
13   | @link     https://www.swoole.com/                                    |
14   | @contact  team@swoole.com                                            |
15   | @license  https://github.com/swoole/swoole-src/blob/master/LICENSE   |
16   | @author   Tianfeng Han  <mikan.tenny@gmail.com>                      |
17   +----------------------------------------------------------------------+
18 */
19 
20 #include "test_core.h"
21 
22 #include "swoole_server.h"
23 #include "swoole_memory.h"
24 #include "swoole_signal.h"
25 #include "swoole_lock.h"
26 
27 using namespace std;
28 using namespace swoole;
29 
30 constexpr int DATA_SIZE = 2 * SW_NUM_MILLION;
31 
32 struct TestPacket {
33     SessionId fd;
34     std::string data;
35 };
36 
37 struct TestMB {
38     std::vector<TestPacket> q;
39     MessageBus mb;
40     std::function<ssize_t(network::Socket *)> read_func;
41 
send_empty_packetTestMB42     bool send_empty_packet(network::Socket *sock) {
43         SendData _data4;
44         _data4.data = "hello world";
45         _data4.info.fd = 4;
46         _data4.info.len = 0;
47         if (!mb.write(sock, &_data4)) {
48             return false;
49         }
50 
51         SendData _data5;
52         _data5.data = nullptr;
53         _data5.info.fd = 5;
54         _data5.info.len = 10;
55         if (!mb.write(sock, &_data5)) {
56             return false;
57         }
58 
59         return true;
60     }
61 
readTestMB62     int read(Event *ev) {
63         auto retval = read_func(ev->socket);
64         if (retval == 0) {
65             return SW_OK;
66         } else if (retval < 0) {
67             swoole_event_del(ev->socket);
68             return SW_ERR;
69         }
70 
71         auto packet = mb.get_packet();
72 
73         q.push_back(TestPacket{
74             mb.get_buffer()->info.fd,
75             std::string(packet.data, packet.length),
76         });
77 
78         if (q.size() == 5) {
79             swoole_event_del(ev->socket);
80         }
81 
82         return SW_OK;
83     }
84 };
85 
86 #define MB_SEND(i, s)                                                                                                  \
87     String pkt##i(s);                                                                                                  \
88     pkt##i.append_random_bytes(pkt##i.size - 1, false);                                                                \
89     pkt##i.append('\0');                                                                                               \
90                                                                                                                        \
91     SendData _data##i{};                                                                                               \
92     _data##i.data = pkt##i.value();                                                                                    \
93     _data##i.info.fd = i;                                                                                              \
94     _data##i.info.len = pkt##i.get_length();                                                                           \
95     ASSERT_TRUE(tmb.mb.write(p.get_socket(true), &_data##i));
96 
97 #define MB_ASSERT(i)                                                                                                   \
98     auto r##i = tmb.q.at(i - 1);                                                                                       \
99     ASSERT_EQ(r##i.fd, i);                                                                                             \
100     ASSERT_STREQ(r##i.data.c_str(), pkt##i.value());
101 
TEST(message_bus,read)102 TEST(message_bus, read) {
103     UnixSocket p(true, SOCK_STREAM);
104     ASSERT_TRUE(p.ready());
105 
106     ASSERT_EQ(swoole_event_init(SW_EVENTLOOP_WAIT_EXIT), SW_OK);
107     p.set_blocking(false);
108     p.set_buffer_size(65536);
109 
110     uint64_t msg_id = 0;
111 
112     TestMB tmb{};
113     tmb.mb.set_buffer_size(65536);
114     tmb.mb.set_id_generator([&msg_id]() { return msg_id++; });
115     tmb.mb.alloc_buffer();
116 
117     tmb.read_func = [&tmb](network::Socket *sock) {
118         return tmb.mb.read(sock);
119     };
120 
121     sw_reactor()->ptr = &tmb;
122 
123     ASSERT_EQ(swoole_event_add(p.get_socket(false), SW_EVENT_READ), SW_OK);
124 
125     swoole_event_set_handler(SW_FD_PIPE | SW_EVENT_READ, [](Reactor *reactor, Event *ev) -> int {
126         TestMB *tmb = (TestMB *) reactor->ptr;
127         return tmb->read(ev);
128     });
129 
130     MB_SEND(1, DATA_SIZE);
131     MB_SEND(2, tmb.mb.get_buffer_size());
132     MB_SEND(3, 2341);
133 
134     tmb.send_empty_packet(p.get_socket(true));
135 
136     ASSERT_EQ(swoole_event_wait(), SW_OK);
137 
138     MB_ASSERT(1);
139     MB_ASSERT(2);
140     MB_ASSERT(3);
141 
142     auto r4 = tmb.q.at(3);
143     ASSERT_EQ(r4.fd, 4);
144     ASSERT_STREQ(r4.data.c_str(), "");
145 
146     auto r5 = tmb.q.at(4);
147     ASSERT_EQ(r5.fd, 5);
148     ASSERT_STREQ(r5.data.c_str(), "");
149 }
150 
TEST(message_bus,read_with_buffer)151 TEST(message_bus, read_with_buffer) {
152     UnixSocket p(true, SOCK_DGRAM);
153     ASSERT_TRUE(p.ready());
154 
155     ASSERT_EQ(swoole_event_init(SW_EVENTLOOP_WAIT_EXIT), SW_OK);
156     p.set_blocking(false);
157     p.set_buffer_size(65536);
158 
159     uint64_t msg_id = 0;
160 
161     TestMB tmb{};
162     tmb.mb.set_buffer_size(65536);
163     tmb.mb.set_id_generator([&msg_id]() { return msg_id++; });
164     tmb.mb.alloc_buffer();
165 
166     tmb.read_func = [&tmb](network::Socket *sock) {
167         return tmb.mb.read_with_buffer(sock);
168     };
169 
170     sw_reactor()->ptr = &tmb;
171 
172     ASSERT_EQ(swoole_event_add(p.get_socket(false), SW_EVENT_READ), SW_OK);
173 
174     swoole_event_set_handler(SW_FD_PIPE | SW_EVENT_READ, [](Reactor *reactor, Event *ev) -> int {
175         TestMB *tmb = (TestMB *) reactor->ptr;
176         return tmb->read(ev);
177     });
178 
179     MB_SEND(1, DATA_SIZE);
180     MB_SEND(2, tmb.mb.get_buffer_size());
181     MB_SEND(3, 2341);
182 
183     tmb.send_empty_packet(p.get_socket(true));
184 
185     ASSERT_EQ(swoole_event_wait(), SW_OK);
186 
187     MB_ASSERT(1);
188     MB_ASSERT(2);
189     MB_ASSERT(3);
190 
191     auto r4 = tmb.q.at(3);
192     ASSERT_EQ(r4.fd, 4);
193     ASSERT_STREQ(r4.data.c_str(), "");
194 
195     auto r5 = tmb.q.at(4);
196     ASSERT_EQ(r5.fd, 5);
197     ASSERT_STREQ(r5.data.c_str(), "");
198 }
199