1 /*
2 * Copyright (C) 2009-2020 Codership Oy <info@codership.com>
3 */
4
5 #include "check_gcomm.hpp"
6 #include "gcomm/protostack.hpp"
7 #include "gcomm/conf.hpp"
8
9 #include "gmcast.hpp"
10 #include "gmcast_message.hpp"
11
12 #include "gu_asio.hpp" // gu::ssl_register_params()
13
14 using namespace std;
15 using namespace gcomm;
16 using namespace gcomm::gmcast;
17 using namespace gu::datetime;
18 using gu::byte_t;
19 using gu::Buffer;
20
21 #include <check.h>
22
23 // Note: Multicast test(s) not run by default.
24 static bool test_multicast(false);
25 string mcast_param("gmcast.mcast_addr=239.192.0.11&gmcast.mcast_port=4567");
26
27
START_TEST(test_gmcast_multicast)28 START_TEST(test_gmcast_multicast)
29 {
30
31 string uri1("gmcast://?gmcast.group=test&gmcast.mcast_addr=239.192.0.11");
32 gu::Config conf;
33 gu::ssl_register_params(conf);
34 gcomm::Conf::register_params(conf);
35 unique_ptr<Protonet> pnet(Protonet::create(conf));
36 Transport* gm1(Transport::create(*pnet, uri1));
37
38 gm1->connect();
39 gm1->close();
40
41 delete gm1;
42 }
43 END_TEST
44
45
START_TEST(test_gmcast_w_user_messages)46 START_TEST(test_gmcast_w_user_messages)
47 {
48 class User : public Toplay
49 {
50 Transport* tp_;
51 size_t recvd_;
52 Protostack pstack_;
53 explicit User(const User&);
54 void operator=(User&);
55
56 public:
57
58 User(Protonet& pnet,
59 const std::string& listen_addr,
60 const std::string& remote_addr) :
61 Toplay(pnet.conf()),
62 tp_(0),
63 recvd_(0),
64 pstack_()
65 {
66 string uri("gmcast://");
67 uri += remote_addr; // != 0 ? remote_addr : "";
68 uri += "?";
69 uri += "tcp.non_blocking=1";
70 uri += "&";
71 uri += "gmcast.group=testgrp";
72 uri += "&gmcast.time_wait=PT0.5S";
73 if (test_multicast == true)
74 {
75 uri += "&" + mcast_param;
76 }
77 uri += "&gmcast.listen_addr=tcp://";
78 uri += listen_addr;
79
80 tp_ = Transport::create(pnet, uri);
81 }
82
83 ~User()
84 {
85 delete tp_;
86 }
87
88 void start(const std::string& peer = "")
89 {
90 if (peer == "")
91 {
92 tp_->connect();
93 }
94 else
95 {
96 tp_->connect(peer);
97 }
98 pstack_.push_proto(tp_);
99 pstack_.push_proto(this);
100 }
101
102
103 void stop()
104 {
105 pstack_.pop_proto(this);
106 pstack_.pop_proto(tp_);
107 tp_->close();
108 }
109
110 void handle_timer()
111 {
112 byte_t buf[16];
113 memset(buf, 0xa5, sizeof(buf));
114
115 Datagram dg(Buffer(buf, buf + sizeof(buf)));
116
117 send_down(dg, ProtoDownMeta());
118 }
119
120 void handle_up(const void* cid, const Datagram& rb,
121 const ProtoUpMeta& um)
122 {
123 if (rb.len() < rb.offset() + 16)
124 {
125 gu_throw_fatal << "offset error";
126 }
127 char buf[16];
128 memset(buf, 0xa5, sizeof(buf));
129 // cppcheck-suppress uninitstring
130 if (memcmp(buf, &rb.payload()[0] + rb.offset(), 16) != 0)
131 {
132 gu_throw_fatal << "content mismatch";
133 }
134 recvd_++;
135 }
136
137 size_t recvd() const
138 {
139 return recvd_;
140 }
141
142 void set_recvd(size_t val)
143 {
144 recvd_ = val;
145 }
146
147 Protostack& pstack() { return pstack_; }
148
149 std::string listen_addr() const
150 {
151 return tp_->listen_addr();
152 }
153 };
154
155 log_info << "START";
156 gu::Config conf;
157 gu::ssl_register_params(conf);
158 gcomm::Conf::register_params(conf);
159 mark_point();
160 unique_ptr<Protonet> pnet(Protonet::create(conf));
161 mark_point();
162 User u1(*pnet, "127.0.0.1:0", "");
163 pnet->insert(&u1.pstack());
164
165 log_info << "u1 start";
166 u1.start();
167
168 pnet->event_loop(Sec/10);
169
170 ck_assert(u1.recvd() == 0);
171
172 log_info << "u2 start";
173 User u2(*pnet, "127.0.0.1:0",
174 u1.listen_addr().erase(0, strlen("tcp://")));
175 pnet->insert(&u2.pstack());
176
177 u2.start();
178
179 while (u1.recvd() <= 50 || u2.recvd() <= 50)
180 {
181 u1.handle_timer();
182 u2.handle_timer();
183 pnet->event_loop(Sec/10);
184 }
185
186 log_info << "u3 start";
187 User u3(*pnet, "127.0.0.1:0",
188 u2.listen_addr().erase(0, strlen("tcp://")));
189 pnet->insert(&u3.pstack());
190 u3.start();
191
192 while (u3.recvd() <= 50)
193 {
194 u1.handle_timer();
195 u2.handle_timer();
196 pnet->event_loop(Sec/10);
197 }
198
199 log_info << "u4 start";
200 User u4(*pnet, "127.0.0.1:0",
201 u2.listen_addr().erase(0, strlen("tcp://")));
202 pnet->insert(&u4.pstack());
203 u4.start();
204
205 while (u4.recvd() <= 50)
206 {
207 u1.handle_timer();
208 u2.handle_timer();
209 pnet->event_loop(Sec/10);
210 }
211
212 log_info << "u1 stop";
213 u1.stop();
214 pnet->erase(&u1.pstack());
215
216 pnet->event_loop(3*Sec);
217
218 log_info << "u1 start";
219 pnet->insert(&u1.pstack());
220 u1.start(u2.listen_addr());
221
222 u1.set_recvd(0);
223 u2.set_recvd(0);
224 u3.set_recvd(0);
225 u4.set_recvd(0);
226
227 for (size_t i(0); i < 30; ++i)
228 {
229 u1.handle_timer();
230 u2.handle_timer();
231 pnet->event_loop(Sec/10);
232 }
233
234 ck_assert(u1.recvd() != 0);
235 ck_assert(u2.recvd() != 0);
236 ck_assert(u3.recvd() != 0);
237 ck_assert(u4.recvd() != 0);
238
239 pnet->erase(&u4.pstack());
240 pnet->erase(&u3.pstack());
241 pnet->erase(&u2.pstack());
242 pnet->erase(&u1.pstack());
243
244 u1.stop();
245 u2.stop();
246 u3.stop();
247 u4.stop();
248
249 pnet->event_loop(0);
250
251 }
252 END_TEST
253
254
255 // not run by default, hard coded port
START_TEST(test_gmcast_auto_addr)256 START_TEST(test_gmcast_auto_addr)
257 {
258 log_info << "START";
259 gu::Config conf;
260 gu::ssl_register_params(conf);
261 gcomm::Conf::register_params(conf);
262 unique_ptr<Protonet> pnet(Protonet::create(conf));
263 Transport* tp1 = Transport::create(*pnet, "gmcast://?gmcast.group=test");
264 Transport* tp2 = Transport::create(*pnet, "gmcast://127.0.0.1:4567"
265 "?gmcast.group=test&gmcast.listen_addr=tcp://127.0.0.1:10002");
266
267 pnet->insert(&tp1->pstack());
268 pnet->insert(&tp2->pstack());
269
270 tp1->connect();
271 tp2->connect();
272
273 pnet->event_loop(Sec);
274
275 pnet->erase(&tp2->pstack());
276 pnet->erase(&tp1->pstack());
277
278 tp1->close();
279 tp2->close();
280
281 delete tp1;
282 delete tp2;
283
284 pnet->event_loop(0);
285
286 }
287 END_TEST
288
289
290
START_TEST(test_gmcast_forget)291 START_TEST(test_gmcast_forget)
292 {
293 gu_conf_self_tstamp_on();
294 log_info << "START";
295 gu::Config conf;
296 gu::ssl_register_params(conf);
297 gcomm::Conf::register_params(conf);
298 unique_ptr<Protonet> pnet(Protonet::create(conf));
299 Transport* tp1 = Transport::create(*pnet, "gmcast://"
300 "?gmcast.group=test&gmcast.listen_addr=tcp://127.0.0.1:0");
301 pnet->insert(&tp1->pstack());
302 tp1->connect();
303
304 Transport* tp2 = Transport::create(*pnet,
305 std::string("gmcast://")
306 + tp1->listen_addr().erase(
307 0, strlen("tcp://"))
308 + "?gmcast.group=test&gmcast.listen_addr=tcp://127.0.0.1:0");
309 Transport* tp3 = Transport::create(*pnet,
310 std::string("gmcast://")
311 + tp1->listen_addr().erase(
312 0, strlen("tcp://"))
313 + "?gmcast.group=test&gmcast.listen_addr=tcp://127.0.0.1:0");
314
315
316 pnet->insert(&tp2->pstack());
317 pnet->insert(&tp3->pstack());
318
319 tp2->connect();
320 tp3->connect();
321
322 pnet->event_loop(Sec);
323
324 UUID uuid1 = tp1->uuid();
325
326 tp1->close();
327 tp2->close(uuid1);
328 tp3->close(uuid1);
329 pnet->event_loop(10*Sec);
330 tp1->connect();
331 // @todo Implement this using User class above and verify that
332 // tp2 and tp3 communicate with each other but now with tp1
333 log_info << "####";
334 pnet->event_loop(Sec);
335
336 pnet->erase(&tp3->pstack());
337 pnet->erase(&tp2->pstack());
338 pnet->erase(&tp1->pstack());
339
340 tp1->close();
341 tp2->close();
342 tp3->close();
343 delete tp1;
344 delete tp2;
345 delete tp3;
346
347 pnet->event_loop(0);
348
349 }
350 END_TEST
351
352
353 // not run by default, hard coded port
START_TEST(test_trac_380)354 START_TEST(test_trac_380)
355 {
356 gu_conf_self_tstamp_on();
357 log_info << "START (test_trac_380)";
358 gu::Config conf;
359 gu::ssl_register_params(conf);
360 gcomm::Conf::register_params(conf);
361 std::unique_ptr<gcomm::Protonet> pnet(gcomm::Protonet::create(conf));
362
363 // caused either assertion or exception
364 gcomm::Transport* tp1(gcomm::Transport::create(
365 *pnet,
366 "gmcast://127.0.0.1:4567?"
367 "gmcast.group=test"));
368 pnet->insert(&tp1->pstack());
369 tp1->connect();
370 try
371 {
372 pnet->event_loop(Sec);
373 }
374 catch (gu::Exception& e)
375 {
376 ck_assert_msg(e.get_errno() == EINVAL,
377 "unexpected errno: %d, cause %s",
378 e.get_errno(), e.what());
379 }
380 pnet->erase(&tp1->pstack());
381 tp1->close();
382 delete tp1;
383 pnet->event_loop(0);
384 }
385 END_TEST
386
387
START_TEST(test_trac_828)388 START_TEST(test_trac_828)
389 {
390 gu_conf_self_tstamp_on();
391 log_info << "START (test_trac_828)";
392 gu::Config conf;
393 gu::ssl_register_params(conf);
394 gcomm::Conf::register_params(conf);
395 std::unique_ptr<gcomm::Protonet> pnet(gcomm::Protonet::create(conf));
396
397 // If the bug is present, this will throw because of own address being
398 // in address list.
399 try
400 {
401 Transport* tp(gcomm::Transport::create(
402 *pnet,
403 "gmcast://127.0.0.1:4567?"
404 "gmcast.group=test&"
405 "gmcast.listen_addr=tcp://127.0.0.1:4567"));
406 delete tp;
407 }
408 catch (gu::Exception& e)
409 {
410 ck_abort_msg("test_trac_828, expcetion thrown because of having own "
411 "address in address list");
412 }
413 }
414 END_TEST
415
START_TEST(test_gmcast_ipv6)416 START_TEST(test_gmcast_ipv6)
417 {
418 log_info << "START test_gmcast_ipv6";
419 gu::Config conf;
420 gu::ssl_register_params(conf);
421 gcomm::Conf::register_params(conf);
422 conf.set("base_host", "ip6-localhost");
423 gu_log_max_level = GU_LOG_DEBUG;
424 std::unique_ptr<gcomm::Protonet> pnet(gcomm::Protonet::create(conf));
425
426 // Without scheme
427 {
428 std::unique_ptr<Transport> tp(gcomm::Transport::create(
429 *pnet,
430 "gmcast://[::1]:4567?"
431 "gmcast.group=test&"
432 "gmcast.listen_addr=tcp://[::1]:4567"));
433 tp->connect();
434 tp->close();
435 }
436
437 {
438 std::unique_ptr<Transport> tp(gcomm::Transport::create(
439 *pnet,
440 "gmcast://ip6-localhost:4567?"
441 "gmcast.group=test&"
442 "gmcast.listen_addr=tcp://ip6-localhost:4567"));
443 tp->connect();
444 tp->close();
445 }
446
447 {
448 std::unique_ptr<Transport> tp(gcomm::Transport::create(
449 *pnet,
450 "gmcast://[::1]?"
451 "gmcast.group=test&"
452 "gmcast.listen_addr=tcp://[::1]"));
453 tp->connect();
454 tp->close();
455 }
456
457 {
458 std::unique_ptr<Transport> tp(gcomm::Transport::create(
459 *pnet,
460 "gmcast://ip6-localhost?"
461 "gmcast.group=test&"
462 "gmcast.listen_addr=tcp://ip6-localhost"));
463 tp->connect();
464 tp->close();
465 }
466 {
467 gcomm::Protolay::sync_param_cb_t spcb;
468 std::unique_ptr<Transport> tp(gcomm::Transport::create(
469 *pnet,
470 "gmcast://ip6-localhost?"
471 "gmcast.group=test&"
472 "gmcast.listen_addr=tcp://[2001:db8:10:9464::233]:4567"));
473 log_info << tp->configured_listen_addr();
474 log_info << conf;
475 ck_assert(tp->configured_listen_addr() == "tcp://[2001:db8:10:9464::233]:4567");
476 }
477 log_info << "END test_gmcast_ipv6";
478 }
479 END_TEST
480
gmcast_suite()481 Suite* gmcast_suite()
482 {
483
484 Suite* s = suite_create("gmcast");
485 TCase* tc;
486
487 if (test_multicast == true)
488 {
489 tc = tcase_create("test_gmcast_multicast");
490 tcase_add_test(tc, test_gmcast_multicast);
491 suite_add_tcase(s, tc);
492 }
493
494 tc = tcase_create("test_gmcast_w_user_messages");
495 tcase_add_test(tc, test_gmcast_w_user_messages);
496 tcase_set_timeout(tc, 30);
497 suite_add_tcase(s, tc);
498
499 // not run by default, hard coded port
500 tc = tcase_create("test_gmcast_auto_addr");
501 tcase_add_test(tc, test_gmcast_auto_addr);
502 suite_add_tcase(s, tc);
503
504 tc = tcase_create("test_gmcast_forget");
505 tcase_add_test(tc, test_gmcast_forget);
506 tcase_set_timeout(tc, 20);
507 suite_add_tcase(s, tc);
508
509 // not run by default, hard coded port
510 tc = tcase_create("test_trac_380");
511 tcase_add_test(tc, test_trac_380);
512 suite_add_tcase(s, tc);
513
514 tc = tcase_create("test_trac_828");
515 tcase_add_test(tc, test_trac_828);
516 suite_add_tcase(s, tc);
517
518 tc = tcase_create("test_gmcast_ipv6");
519 tcase_add_test(tc, test_gmcast_ipv6);
520 suite_add_tcase(s, tc);
521
522 return s;
523
524 }
525