1 /*
2  * Copyright (C) 2009-2020 Codership Oy <info@codership.com>
3  */
4 
5 #include "check_gcomm.hpp"
6 #include "gcomm/protostack.hpp"
7 #include "gcomm/conf.hpp"
8 
9 #include "gmcast.hpp"
10 #include "gmcast_message.hpp"
11 
12 #include "gu_asio.hpp" // gu::ssl_register_params()
13 
14 using namespace std;
15 using namespace gcomm;
16 using namespace gcomm::gmcast;
17 using namespace gu::datetime;
18 using gu::byte_t;
19 using gu::Buffer;
20 
21 #include <check.h>
22 
23 // Note: Multicast test(s) not run by default.
24 static bool test_multicast(false);
25 string mcast_param("gmcast.mcast_addr=239.192.0.11&gmcast.mcast_port=4567");
26 
27 
START_TEST(test_gmcast_multicast)28 START_TEST(test_gmcast_multicast)
29 {
30 
31     string uri1("gmcast://?gmcast.group=test&gmcast.mcast_addr=239.192.0.11");
32     gu::Config conf;
33     gu::ssl_register_params(conf);
34     gcomm::Conf::register_params(conf);
35     auto_ptr<Protonet> pnet(Protonet::create(conf));
36     Transport* gm1(Transport::create(*pnet, uri1));
37 
38     gm1->connect();
39     gm1->close();
40 
41     delete gm1;
42 }
43 END_TEST
44 
45 
START_TEST(test_gmcast_w_user_messages)46 START_TEST(test_gmcast_w_user_messages)
47 {
48     class User : public Toplay
49     {
50         Transport* tp_;
51         size_t recvd_;
52         Protostack pstack_;
53         explicit User(const User&);
54         void operator=(User&);
55 
56     public:
57 
58         User(Protonet& pnet,
59              const std::string& listen_addr,
60              const std::string& remote_addr) :
61             Toplay(pnet.conf()),
62             tp_(0),
63             recvd_(0),
64             pstack_()
65         {
66             string uri("gmcast://");
67             uri += remote_addr; // != 0 ? remote_addr : "";
68             uri += "?";
69             uri += "tcp.non_blocking=1";
70             uri += "&";
71             uri += "gmcast.group=testgrp";
72             uri += "&gmcast.time_wait=PT0.5S";
73             if (test_multicast == true)
74             {
75                 uri += "&" + mcast_param;
76             }
77             uri += "&gmcast.listen_addr=tcp://";
78             uri += listen_addr;
79 
80             tp_ = Transport::create(pnet, uri);
81         }
82 
83         ~User()
84         {
85             delete tp_;
86         }
87 
88         void start(const std::string& peer = "")
89         {
90             if (peer == "")
91             {
92                 tp_->connect();
93             }
94             else
95             {
96                 tp_->connect(peer);
97             }
98             pstack_.push_proto(tp_);
99             pstack_.push_proto(this);
100         }
101 
102 
103         void stop()
104         {
105             pstack_.pop_proto(this);
106             pstack_.pop_proto(tp_);
107             tp_->close();
108         }
109 
110         void handle_timer()
111         {
112             byte_t buf[16];
113             memset(buf, 0xa5, sizeof(buf));
114 
115             Datagram dg(Buffer(buf, buf + sizeof(buf)));
116 
117             send_down(dg, ProtoDownMeta());
118         }
119 
120         void handle_up(const void* cid, const Datagram& rb,
121                        const ProtoUpMeta& um)
122         {
123             if (rb.len() < rb.offset() + 16)
124             {
125                 gu_throw_fatal << "offset error";
126             }
127             char buf[16];
128             memset(buf, 0xa5, sizeof(buf));
129             // cppcheck-suppress uninitstring
130             if (memcmp(buf, &rb.payload()[0] + rb.offset(), 16) != 0)
131             {
132                 gu_throw_fatal << "content mismatch";
133             }
134             recvd_++;
135         }
136 
137         size_t recvd() const
138         {
139             return recvd_;
140         }
141 
142         void set_recvd(size_t val)
143         {
144             recvd_ = val;
145         }
146 
147         Protostack& pstack() { return pstack_; }
148 
149         std::string listen_addr() const
150         {
151             return tp_->listen_addr();
152         }
153     };
154 
155     log_info << "START";
156     gu::Config conf;
157     gu::ssl_register_params(conf);
158     gcomm::Conf::register_params(conf);
159     mark_point();
160     auto_ptr<Protonet> pnet(Protonet::create(conf));
161     mark_point();
162     User u1(*pnet, "127.0.0.1:0", "");
163     pnet->insert(&u1.pstack());
164 
165     log_info << "u1 start";
166     u1.start();
167 
168     pnet->event_loop(Sec/10);
169 
170     ck_assert(u1.recvd() == 0);
171 
172     log_info << "u2 start";
173     User u2(*pnet, "127.0.0.1:0",
174             u1.listen_addr().erase(0, strlen("tcp://")));
175     pnet->insert(&u2.pstack());
176 
177     u2.start();
178 
179     while (u1.recvd() <= 50 || u2.recvd() <= 50)
180     {
181         u1.handle_timer();
182         u2.handle_timer();
183         pnet->event_loop(Sec/10);
184     }
185 
186     log_info << "u3 start";
187     User u3(*pnet, "127.0.0.1:0",
188             u2.listen_addr().erase(0, strlen("tcp://")));
189     pnet->insert(&u3.pstack());
190     u3.start();
191 
192     while (u3.recvd() <= 50)
193     {
194         u1.handle_timer();
195         u2.handle_timer();
196         pnet->event_loop(Sec/10);
197     }
198 
199     log_info << "u4 start";
200     User u4(*pnet, "127.0.0.1:0",
201             u2.listen_addr().erase(0, strlen("tcp://")));
202     pnet->insert(&u4.pstack());
203     u4.start();
204 
205     while (u4.recvd() <= 50)
206     {
207         u1.handle_timer();
208         u2.handle_timer();
209         pnet->event_loop(Sec/10);
210     }
211 
212     log_info << "u1 stop";
213     u1.stop();
214     pnet->erase(&u1.pstack());
215 
216     pnet->event_loop(3*Sec);
217 
218     log_info << "u1 start";
219     pnet->insert(&u1.pstack());
220     u1.start(u2.listen_addr());
221 
222     u1.set_recvd(0);
223     u2.set_recvd(0);
224     u3.set_recvd(0);
225     u4.set_recvd(0);
226 
227     for (size_t i(0); i < 30; ++i)
228     {
229         u1.handle_timer();
230         u2.handle_timer();
231         pnet->event_loop(Sec/10);
232     }
233 
234     ck_assert(u1.recvd() != 0);
235     ck_assert(u2.recvd() != 0);
236     ck_assert(u3.recvd() != 0);
237     ck_assert(u4.recvd() != 0);
238 
239     pnet->erase(&u4.pstack());
240     pnet->erase(&u3.pstack());
241     pnet->erase(&u2.pstack());
242     pnet->erase(&u1.pstack());
243 
244     u1.stop();
245     u2.stop();
246     u3.stop();
247     u4.stop();
248 
249     pnet->event_loop(0);
250 
251 }
252 END_TEST
253 
254 
255 // not run by default, hard coded port
START_TEST(test_gmcast_auto_addr)256 START_TEST(test_gmcast_auto_addr)
257 {
258     log_info << "START";
259     gu::Config conf;
260     gu::ssl_register_params(conf);
261     gcomm::Conf::register_params(conf);
262     auto_ptr<Protonet> pnet(Protonet::create(conf));
263     Transport* tp1 = Transport::create(*pnet, "gmcast://?gmcast.group=test");
264     Transport* tp2 = Transport::create(*pnet, "gmcast://127.0.0.1:4567"
265               "?gmcast.group=test&gmcast.listen_addr=tcp://127.0.0.1:10002");
266 
267     pnet->insert(&tp1->pstack());
268     pnet->insert(&tp2->pstack());
269 
270     tp1->connect();
271     tp2->connect();
272 
273     pnet->event_loop(Sec);
274 
275     pnet->erase(&tp2->pstack());
276     pnet->erase(&tp1->pstack());
277 
278     tp1->close();
279     tp2->close();
280 
281     delete tp1;
282     delete tp2;
283 
284     pnet->event_loop(0);
285 
286 }
287 END_TEST
288 
289 
290 
START_TEST(test_gmcast_forget)291 START_TEST(test_gmcast_forget)
292 {
293     gu_conf_self_tstamp_on();
294     log_info << "START";
295     gu::Config conf;
296     gu::ssl_register_params(conf);
297     gcomm::Conf::register_params(conf);
298     auto_ptr<Protonet> pnet(Protonet::create(conf));
299     Transport* tp1 = Transport::create(*pnet, "gmcast://"
300                     "?gmcast.group=test&gmcast.listen_addr=tcp://127.0.0.1:0");
301     pnet->insert(&tp1->pstack());
302     tp1->connect();
303 
304     Transport* tp2 = Transport::create(*pnet,
305                                        std::string("gmcast://")
306                                        + tp1->listen_addr().erase(
307                                            0, strlen("tcp://"))
308                   + "?gmcast.group=test&gmcast.listen_addr=tcp://127.0.0.1:0");
309     Transport* tp3 = Transport::create(*pnet,
310                                        std::string("gmcast://")
311                                        + tp1->listen_addr().erase(
312                                            0, strlen("tcp://"))
313                   + "?gmcast.group=test&gmcast.listen_addr=tcp://127.0.0.1:0");
314 
315 
316     pnet->insert(&tp2->pstack());
317     pnet->insert(&tp3->pstack());
318 
319     tp2->connect();
320     tp3->connect();
321 
322     pnet->event_loop(Sec);
323 
324     UUID uuid1 = tp1->uuid();
325 
326     tp1->close();
327     tp2->close(uuid1);
328     tp3->close(uuid1);
329     pnet->event_loop(10*Sec);
330     tp1->connect();
331     // @todo Implement this using User class above and verify that
332     // tp2 and tp3 communicate with each other but now with tp1
333     log_info << "####";
334     pnet->event_loop(Sec);
335 
336     pnet->erase(&tp3->pstack());
337     pnet->erase(&tp2->pstack());
338     pnet->erase(&tp1->pstack());
339 
340     tp1->close();
341     tp2->close();
342     tp3->close();
343     delete tp1;
344     delete tp2;
345     delete tp3;
346 
347     pnet->event_loop(0);
348 
349 }
350 END_TEST
351 
352 
353 // not run by default, hard coded port
START_TEST(test_trac_380)354 START_TEST(test_trac_380)
355 {
356     gu_conf_self_tstamp_on();
357     log_info << "START (test_trac_380)";
358     gu::Config conf;
359     gu::ssl_register_params(conf);
360     gcomm::Conf::register_params(conf);
361     std::auto_ptr<gcomm::Protonet> pnet(gcomm::Protonet::create(conf));
362 
363     // caused either assertion or exception
364     gcomm::Transport* tp1(gcomm::Transport::create(
365                               *pnet,
366                               "gmcast://127.0.0.1:4567?"
367                               "gmcast.group=test"));
368     pnet->insert(&tp1->pstack());
369     tp1->connect();
370     try
371     {
372         pnet->event_loop(Sec);
373     }
374     catch (gu::Exception& e)
375     {
376         ck_assert_msg(e.get_errno() == EINVAL,
377                       "unexpected errno: %d, cause %s",
378                       e.get_errno(), e.what());
379     }
380     pnet->erase(&tp1->pstack());
381     tp1->close();
382     delete tp1;
383     pnet->event_loop(0);
384 }
385 END_TEST
386 
387 
START_TEST(test_trac_828)388 START_TEST(test_trac_828)
389 {
390     gu_conf_self_tstamp_on();
391     log_info << "START (test_trac_828)";
392     gu::Config conf;
393     gu::ssl_register_params(conf);
394     gcomm::Conf::register_params(conf);
395     std::auto_ptr<gcomm::Protonet> pnet(gcomm::Protonet::create(conf));
396 
397     // If the bug is present, this will throw because of own address being
398     // in address list.
399     try
400     {
401         Transport* tp(gcomm::Transport::create(
402                           *pnet,
403                           "gmcast://127.0.0.1:4567?"
404                           "gmcast.group=test&"
405                           "gmcast.listen_addr=tcp://127.0.0.1:4567"));
406         delete tp;
407     }
408     catch (gu::Exception& e)
409     {
410         ck_abort_msg("test_trac_828, expcetion thrown because of having own "
411                      "address in address list");
412     }
413 }
414 END_TEST
415 
START_TEST(test_gmcast_ipv6)416 START_TEST(test_gmcast_ipv6)
417 {
418     log_info << "START test_gmcast_ipv6";
419     gu::Config conf;
420     gu::ssl_register_params(conf);
421     gcomm::Conf::register_params(conf);
422     conf.set("base_host", "ip6-localhost");
423     gu_log_max_level = GU_LOG_DEBUG;
424     std::auto_ptr<gcomm::Protonet> pnet(gcomm::Protonet::create(conf));
425 
426     // Without scheme
427     {
428         std::auto_ptr<Transport> tp(gcomm::Transport::create(
429                                         *pnet,
430                                         "gmcast://[::1]:4567?"
431                                         "gmcast.group=test&"
432                                         "gmcast.listen_addr=tcp://[::1]:4567"));
433         tp->connect();
434         tp->close();
435     }
436 
437     {
438         std::auto_ptr<Transport> tp(gcomm::Transport::create(
439                                         *pnet,
440                                         "gmcast://ip6-localhost:4567?"
441                                         "gmcast.group=test&"
442                                         "gmcast.listen_addr=tcp://ip6-localhost:4567"));
443         tp->connect();
444         tp->close();
445     }
446 
447     {
448         std::auto_ptr<Transport> tp(gcomm::Transport::create(
449                                         *pnet,
450                                         "gmcast://[::1]?"
451                                         "gmcast.group=test&"
452                                         "gmcast.listen_addr=tcp://[::1]"));
453         tp->connect();
454         tp->close();
455     }
456 
457     {
458         std::auto_ptr<Transport> tp(gcomm::Transport::create(
459                                         *pnet,
460                                         "gmcast://ip6-localhost?"
461                                         "gmcast.group=test&"
462                                         "gmcast.listen_addr=tcp://ip6-localhost"));
463         tp->connect();
464         tp->close();
465     }
466     {
467         gcomm::Protolay::sync_param_cb_t spcb;
468         std::auto_ptr<Transport> tp(gcomm::Transport::create(
469                                         *pnet,
470                                         "gmcast://ip6-localhost?"
471                                         "gmcast.group=test&"
472                                         "gmcast.listen_addr=tcp://[2001:db8:10:9464::233]:4567"));
473         log_info << tp->configured_listen_addr();
474         log_info << conf;
475         ck_assert(tp->configured_listen_addr() == "tcp://[2001:db8:10:9464::233]:4567");
476     }
477     log_info << "END test_gmcast_ipv6";
478 }
479 END_TEST
480 
gmcast_suite()481 Suite* gmcast_suite()
482 {
483 
484     Suite* s = suite_create("gmcast");
485     TCase* tc;
486 
487     if (test_multicast == true)
488     {
489         tc = tcase_create("test_gmcast_multicast");
490         tcase_add_test(tc, test_gmcast_multicast);
491         suite_add_tcase(s, tc);
492     }
493 
494     tc = tcase_create("test_gmcast_w_user_messages");
495     tcase_add_test(tc, test_gmcast_w_user_messages);
496     tcase_set_timeout(tc, 30);
497     suite_add_tcase(s, tc);
498 
499     // not run by default, hard coded port
500     tc = tcase_create("test_gmcast_auto_addr");
501     tcase_add_test(tc, test_gmcast_auto_addr);
502     suite_add_tcase(s, tc);
503 
504     tc = tcase_create("test_gmcast_forget");
505     tcase_add_test(tc, test_gmcast_forget);
506     tcase_set_timeout(tc, 20);
507     suite_add_tcase(s, tc);
508 
509     // not run by default, hard coded port
510     tc = tcase_create("test_trac_380");
511     tcase_add_test(tc, test_trac_380);
512     suite_add_tcase(s, tc);
513 
514     tc = tcase_create("test_trac_828");
515     tcase_add_test(tc, test_trac_828);
516     suite_add_tcase(s, tc);
517 
518     tc = tcase_create("test_gmcast_ipv6");
519     tcase_add_test(tc, test_gmcast_ipv6);
520     suite_add_tcase(s, tc);
521 
522     return s;
523 
524 }
525