1
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <future>
5 #include <thread>
6 #include <chrono>
7 #include <vector>
8 #include <functional>
9 #ifdef _WIN32
10 #define usleep(x) Sleep(x / 1000)
11 #else
12 #include <unistd.h>
13 #endif
14
15 #if ENABLE_EXPERIMENTAL_BONDING
16
17 #include "gtest/gtest.h"
18
19 #include "srt.h"
20 #include "netinet_any.h"
21
TEST(Bonding,SRTConnectGroup)22 TEST(Bonding, SRTConnectGroup)
23 {
24 struct sockaddr_in sa;
25
26 srt_startup();
27
28 const int ss = srt_create_group(SRT_GTYPE_BROADCAST);
29 ASSERT_NE(ss, SRT_ERROR);
30
31 std::vector<SRT_SOCKGROUPCONFIG> targets;
32 for (int i = 0; i < 2; ++i)
33 {
34 sa.sin_family = AF_INET;
35 sa.sin_port = htons(4200 + i);
36 ASSERT_EQ(inet_pton(AF_INET, "192.168.1.237", &sa.sin_addr), 1);
37
38 const SRT_SOCKGROUPCONFIG gd = srt_prepare_endpoint(NULL, (struct sockaddr*)&sa, sizeof sa);
39 targets.push_back(gd);
40 }
41
42 std::future<void> closing_promise = std::async(std::launch::async, [](int ss) {
43 std::this_thread::sleep_for(std::chrono::seconds(2));
44 std::cerr << "Closing group" << std::endl;
45 srt_close(ss);
46 }, ss);
47
48 std::cout << "srt_connect_group calling " << std::endl;
49 const int st = srt_connect_group(ss, targets.data(), targets.size());
50 std::cout << "srt_connect_group returned " << st << std::endl;
51
52 closing_promise.wait();
53 // Delete config objects before prospective exception
54 for (auto& gd: targets)
55 srt_delete_config(gd.config);
56
57 int res = srt_close(ss);
58 if (res == SRT_ERROR)
59 {
60 std::cerr << "srt_close: " << srt_getlasterror_str() << std::endl;
61 }
62
63 srt_cleanup();
64 }
65
66
listening_thread(bool should_read)67 void listening_thread(bool should_read)
68 {
69 const SRTSOCKET server_sock = srt_create_socket();
70 sockaddr_in bind_sa;
71 memset(&bind_sa, 0, sizeof bind_sa);
72 bind_sa.sin_family = AF_INET;
73 ASSERT_EQ(inet_pton(AF_INET, "127.0.0.1", &bind_sa.sin_addr), 1);
74 bind_sa.sin_port = htons(4200);
75
76 ASSERT_NE(srt_bind(server_sock, (sockaddr*)&bind_sa, sizeof bind_sa), -1);
77 const int yes = 1;
78 srt_setsockflag(server_sock, SRTO_GROUPCONNECT, &yes, sizeof yes);
79
80 const int no = 1;
81 srt_setsockflag(server_sock, SRTO_RCVSYN, &no, sizeof no);
82
83 const int eid = srt_epoll_create();
84 const int listen_event = SRT_EPOLL_IN | SRT_EPOLL_ERR;
85 srt_epoll_add_usock(eid, server_sock, &listen_event);
86
87 ASSERT_NE(srt_listen(server_sock, 5), -1);
88 std::cout << "Listen: wait for acceptability\n";
89 int fds[2];
90 int fds_len = 2;
91 int ers[2];
92 int ers_len = 2;
93 int wr = srt_epoll_wait(eid, fds, &fds_len, ers, &ers_len, 5000,
94 0, 0, 0, 0);
95
96 ASSERT_NE(wr, -1);
97 std::cout << "Listen: reported " << fds_len << " acceptable and " << ers_len << " errors\n";
98 ASSERT_GT(fds_len, 0);
99 ASSERT_EQ(fds[0], server_sock);
100
101 sockaddr_any scl;
102 int acp = srt_accept(server_sock, (scl.get()), (&scl.len));
103 ASSERT_NE(acp & SRTGROUP_MASK, 0);
104
105 if (should_read)
106 {
107 std::cout << "Listener will read packets...\n";
108 // Read everything until closed
109 int n = 0;
110 for (;;)
111 {
112 char buf[1500];
113 int rd = srt_recv(acp, buf, 1500);
114 if (rd == -1)
115 {
116 std::cout << "Listener read " << n << " packets, stopping\n";
117 break;
118 }
119 ++n;
120 }
121 }
122
123 srt_close(acp);
124
125 std::cout << "Listen: wait 7 seconds\n";
126 std::this_thread::sleep_for(std::chrono::seconds(7));
127 // srt_accept..
128 }
129
ConnectCallback(void *,SRTSOCKET sock,int error,const sockaddr *,int token)130 void ConnectCallback(void* /*opaq*/, SRTSOCKET sock, int error, const sockaddr* /*peer*/, int token)
131 {
132 std::cout << "Connect callback. Socket: " << sock
133 << ", error: " << error
134 << ", token: " << token << '\n';
135 }
136
TEST(Bonding,NonBlockingGroupConnect)137 TEST(Bonding, NonBlockingGroupConnect)
138 {
139 srt_startup();
140
141 const int ss = srt_create_group(SRT_GTYPE_BROADCAST);
142 ASSERT_NE(ss, SRT_ERROR);
143 std::cout << "Created group socket: " << ss << '\n';
144
145 int no = 0;
146 ASSERT_NE(srt_setsockopt(ss, 0, SRTO_RCVSYN, &no, sizeof no), SRT_ERROR); // non-blocking mode
147 ASSERT_NE(srt_setsockopt(ss, 0, SRTO_SNDSYN, &no, sizeof no), SRT_ERROR); // non-blocking mode
148
149 const int poll_id = srt_epoll_create();
150 // Will use this epoll to wait for srt_accept(...)
151 const int epoll_out = SRT_EPOLL_OUT | SRT_EPOLL_ERR;
152 ASSERT_NE(srt_epoll_add_usock(poll_id, ss, &epoll_out), SRT_ERROR);
153
154 srt_connect_callback(ss, &ConnectCallback, this);
155
156 sockaddr_in sa;
157 sa.sin_family = AF_INET;
158 sa.sin_port = htons(4200);
159 ASSERT_EQ(inet_pton(AF_INET, "127.0.0.1", &sa.sin_addr), 1);
160
161 sockaddr_in safail = sa;
162 safail.sin_port = htons(4201); // port where we have no listener
163
164 std::future<void> listen_promise = std::async(std::launch::async, std::bind(&listening_thread, false));
165
166 std::cout << "Connecting two sockets " << std::endl;
167 {
168 const int sockid = srt_connect(ss, (sockaddr*) &sa, sizeof sa);
169 EXPECT_GT(sockid, 0) << "Socket " << 1;
170 sa.sin_port = htons(4201); // Changing port so that second connect fails
171 std::cout << "Socket created: " << sockid << '\n';
172 ASSERT_NE(srt_epoll_add_usock(poll_id, sockid, &epoll_out), SRT_ERROR);
173 }
174 {
175 const int sockid = srt_connect(ss, (sockaddr*) &safail, sizeof safail);
176 EXPECT_GT(sockid, 0) << "Socket " << 2;
177 safail.sin_port = htons(4201); // Changing port so that second connect fails
178 std::cout << "Socket created: " << sockid << '\n';
179 ASSERT_NE(srt_epoll_add_usock(poll_id, sockid, &epoll_out), SRT_ERROR);
180 }
181 std::cout << "Returned from connecting two sockets " << std::endl;
182
183 const int default_len = 3;
184 int rlen = default_len;
185 SRTSOCKET read[default_len];
186
187 int wlen = default_len;
188 SRTSOCKET write[default_len];
189
190 for (int j = 0; j < 2; ++j)
191 {
192 const int epoll_res = srt_epoll_wait(poll_id, read, &rlen,
193 write, &wlen,
194 5000, /* timeout */
195 0, 0, 0, 0);
196
197 std::cout << "Epoll result: " << epoll_res << '\n';
198 std::cout << "Epoll rlen: " << rlen << ", wlen: " << wlen << '\n';
199 for (int i = 0; i < rlen; ++i)
200 {
201 std::cout << "Epoll read[" << i << "]: " << read[i] << '\n';
202 }
203 for (int i = 0; i < wlen; ++i)
204 {
205 std::cout << "Epoll write[" << i << "]: " << write[i] << " (removed from epoll)\n";
206 EXPECT_EQ(srt_epoll_remove_usock(poll_id, write[i]), 0);
207 }
208 }
209
210 listen_promise.wait();
211
212 EXPECT_EQ(srt_close(ss), 0) << "srt_close: %s\n" << srt_getlasterror_str();
213
214 srt_cleanup();
215 }
216
ConnectCallback_Close(void *,SRTSOCKET sock,int error,const sockaddr *,int token)217 void ConnectCallback_Close(void* /*opaq*/, SRTSOCKET sock, int error, const sockaddr* /*peer*/, int token)
218 {
219 std::cout << "Connect callback. Socket: " << sock
220 << ", error: " << error
221 << ", token: " << token << '\n';
222
223 if (error == SRT_SUCCESS)
224 return;
225
226 // XXX WILL CAUSE DEADLOCK!
227 srt_close(sock);
228 }
229
TEST(Bonding,CloseGroupAndSocket)230 TEST(Bonding, CloseGroupAndSocket)
231 {
232 srt_startup();
233
234 const int ss = srt_create_group(SRT_GTYPE_BROADCAST);
235 ASSERT_NE(ss, SRT_ERROR);
236 std::cout << "Created group socket: " << ss << '\n';
237
238 int no = 0;
239 ASSERT_NE(srt_setsockopt(ss, 0, SRTO_RCVSYN, &no, sizeof no), SRT_ERROR); // non-blocking mode
240 ASSERT_NE(srt_setsockopt(ss, 0, SRTO_SNDSYN, &no, sizeof no), SRT_ERROR); // non-blocking mode
241
242 const int poll_id = srt_epoll_create();
243 // Will use this epoll to wait for srt_accept(...)
244 const int epoll_out = SRT_EPOLL_OUT | SRT_EPOLL_ERR;
245 ASSERT_NE(srt_epoll_add_usock(poll_id, ss, &epoll_out), SRT_ERROR);
246
247 srt_connect_callback(ss, &ConnectCallback_Close, this);
248
249 sockaddr_in sa;
250 sa.sin_family = AF_INET;
251 sa.sin_port = htons(4200);
252 ASSERT_EQ(inet_pton(AF_INET, "127.0.0.1", &sa.sin_addr), 1);
253
254 std::future<void> listen_promise = std::async(std::launch::async, std::bind(listening_thread, true));
255
256 std::cout << "Connecting two sockets " << std::endl;
257 for (int i = 0; i < 2; ++i)
258 {
259 const int sockid = srt_connect(ss, (sockaddr*) &sa, sizeof sa);
260 EXPECT_GT(sockid, 0) << "Socket " << i;
261 sa.sin_port = htons(4201); // Changing port so that second connect fails
262 std::cout << "Socket created: " << sockid << '\n';
263 ASSERT_NE(srt_epoll_add_usock(poll_id, sockid, &epoll_out), SRT_ERROR);
264 }
265 std::cout << "Returned from connecting two sockets " << std::endl;
266
267 const int default_len = 3;
268 int rlen = default_len;
269 SRTSOCKET read[default_len];
270
271 int wlen = default_len;
272 SRTSOCKET write[default_len];
273
274 for (int j = 0; j < 2; ++j)
275 {
276 const int epoll_res = srt_epoll_wait(poll_id, read, &rlen,
277 write, &wlen,
278 5000, /* timeout */
279 0, 0, 0, 0);
280
281 std::cout << "Epoll result: " << epoll_res << '\n';
282 std::cout << "Epoll rlen: " << rlen << ", wlen: " << wlen << '\n';
283 for (int i = 0; i < rlen; ++i)
284 {
285 std::cout << "Epoll read[" << i << "]: " << read[i] << '\n';
286 }
287 for (int i = 0; i < wlen; ++i)
288 {
289 std::cout << "Epoll write[" << i << "]: " << write[i] << " (removed from epoll)\n";
290 EXPECT_EQ(srt_epoll_remove_usock(poll_id, write[i]), 0);
291 }
292 }
293
294 // Some basic checks for group stats
295 SRT_TRACEBSTATS stats;
296 EXPECT_EQ(srt_bstats(ss, &stats, true), SRT_SUCCESS);
297 EXPECT_EQ(stats.pktSent, 0);
298 EXPECT_EQ(stats.pktSentTotal, 0);
299 EXPECT_EQ(stats.pktSentUnique, 0);
300 EXPECT_EQ(stats.pktSentUniqueTotal, 0);
301 EXPECT_EQ(stats.pktRecv, 0);
302 EXPECT_EQ(stats.pktRecvTotal, 0);
303 EXPECT_EQ(stats.pktRecvUnique, 0);
304 EXPECT_EQ(stats.pktRecvUniqueTotal, 0);
305 EXPECT_EQ(stats.pktRcvDrop, 0);
306 EXPECT_EQ(stats.pktRcvDropTotal, 0);
307
308 std::cout << "Starting thread for sending:\n";
309 std::thread sender([ss] {
310 char buf[1316];
311 memset(buf, 1, sizeof(buf));
312 int n = 0;
313 for (int i = 0; i < 10000; ++i)
314 {
315 std::this_thread::sleep_for(std::chrono::milliseconds(10));
316 if (srt_send(ss, buf, 1316) == -1)
317 {
318 std::cout << "[Sender] sending failure, exitting after sending " << n << " packets\n";
319 break;
320 }
321
322 ++n;
323 }
324 });
325
326 std::cout << "Will close sending in 300ms...\n";
327
328 std::this_thread::sleep_for(std::chrono::milliseconds(300));
329
330 EXPECT_EQ(srt_close(ss), 0) << "srt_close: %s\n" << srt_getlasterror_str();
331
332 std::cout << "CLOSED GROUP. Now waiting for sender to exit...\n";
333 sender.join();
334 listen_promise.wait();
335
336 srt_cleanup();
337 }
338
339 #endif // ENABLE_EXPERIMENTAL_BONDING
340