1 
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <future>
5 #include <thread>
6 #include <chrono>
7 #include <vector>
8 #include <functional>
9 #ifdef _WIN32
10 #define usleep(x) Sleep(x / 1000)
11 #else
12 #include <unistd.h>
13 #endif
14 
15 #if ENABLE_EXPERIMENTAL_BONDING
16 
17 #include "gtest/gtest.h"
18 
19 #include "srt.h"
20 #include "netinet_any.h"
21 
TEST(Bonding,SRTConnectGroup)22 TEST(Bonding, SRTConnectGroup)
23 {
24     struct sockaddr_in sa;
25 
26     srt_startup();
27 
28     const int ss = srt_create_group(SRT_GTYPE_BROADCAST);
29     ASSERT_NE(ss, SRT_ERROR);
30 
31     std::vector<SRT_SOCKGROUPCONFIG> targets;
32     for (int i = 0; i < 2; ++i)
33     {
34         sa.sin_family = AF_INET;
35         sa.sin_port = htons(4200 + i);
36         ASSERT_EQ(inet_pton(AF_INET, "192.168.1.237", &sa.sin_addr), 1);
37 
38         const SRT_SOCKGROUPCONFIG gd = srt_prepare_endpoint(NULL, (struct sockaddr*)&sa, sizeof sa);
39         targets.push_back(gd);
40     }
41 
42     std::future<void> closing_promise = std::async(std::launch::async, [](int ss) {
43         std::this_thread::sleep_for(std::chrono::seconds(2));
44         std::cerr << "Closing group" << std::endl;
45         srt_close(ss);
46     }, ss);
47 
48     std::cout << "srt_connect_group calling " << std::endl;
49     const int st = srt_connect_group(ss, targets.data(), targets.size());
50     std::cout << "srt_connect_group returned " << st << std::endl;
51 
52     closing_promise.wait();
53     // Delete config objects before prospective exception
54     for (auto& gd: targets)
55         srt_delete_config(gd.config);
56 
57     int res = srt_close(ss);
58     if (res == SRT_ERROR)
59     {
60         std::cerr << "srt_close: " << srt_getlasterror_str() << std::endl;
61     }
62 
63     srt_cleanup();
64 }
65 
66 
listening_thread(bool should_read)67 void listening_thread(bool should_read)
68 {
69     const SRTSOCKET server_sock = srt_create_socket();
70     sockaddr_in bind_sa;
71     memset(&bind_sa, 0, sizeof bind_sa);
72     bind_sa.sin_family = AF_INET;
73     ASSERT_EQ(inet_pton(AF_INET, "127.0.0.1", &bind_sa.sin_addr), 1);
74     bind_sa.sin_port = htons(4200);
75 
76     ASSERT_NE(srt_bind(server_sock, (sockaddr*)&bind_sa, sizeof bind_sa), -1);
77     const int yes = 1;
78     srt_setsockflag(server_sock, SRTO_GROUPCONNECT, &yes, sizeof yes);
79 
80     const int no = 1;
81     srt_setsockflag(server_sock, SRTO_RCVSYN, &no, sizeof no);
82 
83     const int eid = srt_epoll_create();
84     const int listen_event = SRT_EPOLL_IN | SRT_EPOLL_ERR;
85     srt_epoll_add_usock(eid, server_sock, &listen_event);
86 
87     ASSERT_NE(srt_listen(server_sock, 5), -1);
88     std::cout << "Listen: wait for acceptability\n";
89     int fds[2];
90     int fds_len = 2;
91     int ers[2];
92     int ers_len = 2;
93     int wr = srt_epoll_wait(eid, fds, &fds_len, ers, &ers_len, 5000,
94             0, 0, 0, 0);
95 
96     ASSERT_NE(wr, -1);
97     std::cout << "Listen: reported " << fds_len << " acceptable and " << ers_len << " errors\n";
98     ASSERT_GT(fds_len, 0);
99     ASSERT_EQ(fds[0], server_sock);
100 
101     sockaddr_any scl;
102     int acp = srt_accept(server_sock, (scl.get()), (&scl.len));
103     ASSERT_NE(acp & SRTGROUP_MASK, 0);
104 
105     if (should_read)
106     {
107         std::cout << "Listener will read packets...\n";
108         // Read everything until closed
109         int n = 0;
110         for (;;)
111         {
112             char buf[1500];
113             int rd = srt_recv(acp, buf, 1500);
114             if (rd == -1)
115             {
116                 std::cout << "Listener read " << n << " packets, stopping\n";
117                 break;
118             }
119             ++n;
120         }
121     }
122 
123     srt_close(acp);
124 
125     std::cout << "Listen: wait 7 seconds\n";
126     std::this_thread::sleep_for(std::chrono::seconds(7));
127     // srt_accept..
128 }
129 
ConnectCallback(void *,SRTSOCKET sock,int error,const sockaddr *,int token)130 void ConnectCallback(void* /*opaq*/, SRTSOCKET sock, int error, const sockaddr* /*peer*/, int token)
131 {
132     std::cout << "Connect callback. Socket: " << sock
133         << ", error: " << error
134         << ", token: " << token << '\n';
135 }
136 
TEST(Bonding,NonBlockingGroupConnect)137 TEST(Bonding, NonBlockingGroupConnect)
138 {
139     srt_startup();
140 
141     const int ss = srt_create_group(SRT_GTYPE_BROADCAST);
142     ASSERT_NE(ss, SRT_ERROR);
143     std::cout << "Created group socket: " << ss << '\n';
144 
145     int no = 0;
146     ASSERT_NE(srt_setsockopt(ss, 0, SRTO_RCVSYN, &no, sizeof no), SRT_ERROR); // non-blocking mode
147     ASSERT_NE(srt_setsockopt(ss, 0, SRTO_SNDSYN, &no, sizeof no), SRT_ERROR); // non-blocking mode
148 
149     const int poll_id = srt_epoll_create();
150     // Will use this epoll to wait for srt_accept(...)
151     const int epoll_out = SRT_EPOLL_OUT | SRT_EPOLL_ERR;
152     ASSERT_NE(srt_epoll_add_usock(poll_id, ss, &epoll_out), SRT_ERROR);
153 
154     srt_connect_callback(ss, &ConnectCallback, this);
155 
156     sockaddr_in sa;
157     sa.sin_family = AF_INET;
158     sa.sin_port = htons(4200);
159     ASSERT_EQ(inet_pton(AF_INET, "127.0.0.1", &sa.sin_addr), 1);
160 
161     sockaddr_in safail = sa;
162     safail.sin_port = htons(4201); // port where we have no listener
163 
164     std::future<void> listen_promise = std::async(std::launch::async, std::bind(&listening_thread, false));
165 
166     std::cout << "Connecting two sockets " << std::endl;
167     {
168         const int sockid = srt_connect(ss, (sockaddr*) &sa, sizeof sa);
169         EXPECT_GT(sockid, 0) << "Socket " << 1;
170         sa.sin_port = htons(4201); // Changing port so that second connect fails
171         std::cout << "Socket created: " << sockid << '\n';
172         ASSERT_NE(srt_epoll_add_usock(poll_id, sockid, &epoll_out), SRT_ERROR);
173     }
174     {
175         const int sockid = srt_connect(ss, (sockaddr*) &safail, sizeof safail);
176         EXPECT_GT(sockid, 0) << "Socket " << 2;
177         safail.sin_port = htons(4201); // Changing port so that second connect fails
178         std::cout << "Socket created: " << sockid << '\n';
179         ASSERT_NE(srt_epoll_add_usock(poll_id, sockid, &epoll_out), SRT_ERROR);
180     }
181     std::cout << "Returned from connecting two sockets " << std::endl;
182 
183     const int default_len = 3;
184     int rlen = default_len;
185     SRTSOCKET read[default_len];
186 
187     int wlen = default_len;
188     SRTSOCKET write[default_len];
189 
190     for (int j = 0; j < 2; ++j)
191     {
192         const int epoll_res = srt_epoll_wait(poll_id, read, &rlen,
193             write, &wlen,
194             5000, /* timeout */
195             0, 0, 0, 0);
196 
197         std::cout << "Epoll result: " << epoll_res << '\n';
198         std::cout << "Epoll rlen: " << rlen << ", wlen: " << wlen << '\n';
199         for (int i = 0; i < rlen; ++i)
200         {
201             std::cout << "Epoll read[" << i << "]: " << read[i] << '\n';
202         }
203         for (int i = 0; i < wlen; ++i)
204         {
205             std::cout << "Epoll write[" << i << "]: " << write[i] << " (removed from epoll)\n";
206             EXPECT_EQ(srt_epoll_remove_usock(poll_id, write[i]), 0);
207         }
208     }
209 
210     listen_promise.wait();
211 
212     EXPECT_EQ(srt_close(ss), 0) << "srt_close: %s\n" << srt_getlasterror_str();
213 
214     srt_cleanup();
215 }
216 
ConnectCallback_Close(void *,SRTSOCKET sock,int error,const sockaddr *,int token)217 void ConnectCallback_Close(void* /*opaq*/, SRTSOCKET sock, int error, const sockaddr* /*peer*/, int token)
218 {
219     std::cout << "Connect callback. Socket: " << sock
220         << ", error: " << error
221         << ", token: " << token << '\n';
222 
223     if (error == SRT_SUCCESS)
224         return;
225 
226     // XXX WILL CAUSE DEADLOCK!
227     srt_close(sock);
228 }
229 
TEST(Bonding,CloseGroupAndSocket)230 TEST(Bonding, CloseGroupAndSocket)
231 {
232     srt_startup();
233 
234     const int ss = srt_create_group(SRT_GTYPE_BROADCAST);
235     ASSERT_NE(ss, SRT_ERROR);
236     std::cout << "Created group socket: " << ss << '\n';
237 
238     int no = 0;
239     ASSERT_NE(srt_setsockopt(ss, 0, SRTO_RCVSYN, &no, sizeof no), SRT_ERROR); // non-blocking mode
240     ASSERT_NE(srt_setsockopt(ss, 0, SRTO_SNDSYN, &no, sizeof no), SRT_ERROR); // non-blocking mode
241 
242     const int poll_id = srt_epoll_create();
243     // Will use this epoll to wait for srt_accept(...)
244     const int epoll_out = SRT_EPOLL_OUT | SRT_EPOLL_ERR;
245     ASSERT_NE(srt_epoll_add_usock(poll_id, ss, &epoll_out), SRT_ERROR);
246 
247     srt_connect_callback(ss, &ConnectCallback_Close, this);
248 
249     sockaddr_in sa;
250     sa.sin_family = AF_INET;
251     sa.sin_port = htons(4200);
252     ASSERT_EQ(inet_pton(AF_INET, "127.0.0.1", &sa.sin_addr), 1);
253 
254     std::future<void> listen_promise = std::async(std::launch::async, std::bind(listening_thread, true));
255 
256     std::cout << "Connecting two sockets " << std::endl;
257     for (int i = 0; i < 2; ++i)
258     {
259         const int sockid = srt_connect(ss, (sockaddr*) &sa, sizeof sa);
260         EXPECT_GT(sockid, 0) << "Socket " << i;
261         sa.sin_port = htons(4201); // Changing port so that second connect fails
262         std::cout << "Socket created: " << sockid << '\n';
263         ASSERT_NE(srt_epoll_add_usock(poll_id, sockid, &epoll_out), SRT_ERROR);
264     }
265     std::cout << "Returned from connecting two sockets " << std::endl;
266 
267     const int default_len = 3;
268     int rlen = default_len;
269     SRTSOCKET read[default_len];
270 
271     int wlen = default_len;
272     SRTSOCKET write[default_len];
273 
274     for (int j = 0; j < 2; ++j)
275     {
276         const int epoll_res = srt_epoll_wait(poll_id, read, &rlen,
277             write, &wlen,
278             5000, /* timeout */
279             0, 0, 0, 0);
280 
281         std::cout << "Epoll result: " << epoll_res << '\n';
282         std::cout << "Epoll rlen: " << rlen << ", wlen: " << wlen << '\n';
283         for (int i = 0; i < rlen; ++i)
284         {
285             std::cout << "Epoll read[" << i << "]: " << read[i] << '\n';
286         }
287         for (int i = 0; i < wlen; ++i)
288         {
289             std::cout << "Epoll write[" << i << "]: " << write[i] << " (removed from epoll)\n";
290             EXPECT_EQ(srt_epoll_remove_usock(poll_id, write[i]), 0);
291         }
292     }
293 
294     // Some basic checks for group stats
295     SRT_TRACEBSTATS stats;
296     EXPECT_EQ(srt_bstats(ss, &stats, true), SRT_SUCCESS);
297     EXPECT_EQ(stats.pktSent, 0);
298     EXPECT_EQ(stats.pktSentTotal, 0);
299     EXPECT_EQ(stats.pktSentUnique, 0);
300     EXPECT_EQ(stats.pktSentUniqueTotal, 0);
301     EXPECT_EQ(stats.pktRecv, 0);
302     EXPECT_EQ(stats.pktRecvTotal, 0);
303     EXPECT_EQ(stats.pktRecvUnique, 0);
304     EXPECT_EQ(stats.pktRecvUniqueTotal, 0);
305     EXPECT_EQ(stats.pktRcvDrop, 0);
306     EXPECT_EQ(stats.pktRcvDropTotal, 0);
307 
308     std::cout << "Starting thread for sending:\n";
309     std::thread sender([ss] {
310         char buf[1316];
311         memset(buf, 1, sizeof(buf));
312         int n = 0;
313         for (int i = 0; i < 10000; ++i)
314         {
315             std::this_thread::sleep_for(std::chrono::milliseconds(10));
316             if (srt_send(ss, buf, 1316) == -1)
317             {
318                 std::cout << "[Sender] sending failure, exitting after sending " << n << " packets\n";
319                 break;
320             }
321 
322             ++n;
323         }
324     });
325 
326     std::cout << "Will close sending in 300ms...\n";
327 
328     std::this_thread::sleep_for(std::chrono::milliseconds(300));
329 
330     EXPECT_EQ(srt_close(ss), 0) << "srt_close: %s\n" << srt_getlasterror_str();
331 
332     std::cout << "CLOSED GROUP. Now waiting for sender to exit...\n";
333     sender.join();
334     listen_promise.wait();
335 
336     srt_cleanup();
337 }
338 
339 #endif // ENABLE_EXPERIMENTAL_BONDING
340