1 /*
2 * Copyright (C) 2008-2020 Codership Oy <info@codership.com>
3 *
4 * $Id$
5 */
6
7 /*
8 * @file
9 *
10 * Defines unit tests for gcs_core (and as a result tests gcs_group and
11 * a dummy backend which gcs_core depends on)
12 *
13 * Most of the checks require independent sending and receiving threads.
14 * Approach 1 is to start separate threads for both sending and receiving
15 * and use the current thread of execution to sychronize between them:
16 *
17 * CORE_RECV_START(act_r)
18 * CORE_SEND_START(act_s)
19 * while (gcs_core_send_step(Core)) { // step through action fragments
20 * (do something)
21 * };
22 * CORE_SEND_END(act_s, ret) // check return code
23 * CORE_RECV_END(act_r, size, type) // makes checks against size and type
24 *
25 * A simplified approach 2 is:
26 *
27 * CORE_SEND_START(act_s)
28 * while (gcs_core_send_step(Core)) { // step through action fragments
29 * (do something)
30 * };
31 * CORE_SEND_END(act_s, ret) // check return code
32 * CORE_RECV_ACT(act_r, size, type) // makes checks agains size and type
33 *
34 * In the first approach group messages will be received concurrently.
35 * In the second approach messages will wait in queue and be fetched afterwards
36 *
37 */
38
39 #define GCS_STATE_MSG_ACCESS
40 #include "../gcs_core.hpp"
41 #include "../gcs_dummy.hpp"
42 #include "../gcs_seqno.hpp"
43 #include "../gcs_state_msg.hpp"
44 #include "../gcs_code_msg.hpp"
45
46 #include <galerautils.h>
47 #include "gu_config.hpp"
48
49 #include "gcs_test_utils.hpp"
50 #include "gcs_core_test.hpp" // must be included last
51
START_TEST(gcs_code_msg)52 START_TEST(gcs_code_msg)
53 {
54 gu::UUID const u0(NULL, 0);
55 gcs_seqno_t const s0(1234);
56 uint64_t const c0(4312);
57
58 gcs::core::CodeMsg cm0(gu::GTID(u0, s0), c0);
59
60 const void* const buf(cm0());
61
62 const gcs::core::CodeMsg* const cm1
63 (static_cast<const gcs::core::CodeMsg*>(buf));
64
65 gu::UUID const u1(cm1->uuid());
66 gcs_seqno_t const s1(cm1->seqno());
67 uint64_t const c1(cm1->code());
68
69 ck_assert(u0 == u1);
70 ck_assert(s0 == s1);
71 ck_assert(c0 == c1);
72 }
73 END_TEST
74
75 extern ssize_t gcs_tests_get_allocated();
76
77 static const long UNKNOWN_SIZE = 1234567890; // some unrealistic number
78
79 static std::string const CacheName("core_test.cache");
80 static gcache::GCache* Cache = NULL;
81 static gcs_core_t* Core = NULL;
82 static gcs_backend_t* Backend = NULL;
83 static gcs_seqno_t Seqno = 0;
84 static gu::UUID Uuid;
85
86 typedef struct action {
87 const struct gu_buf* in;
88 void* out;
89 const void* local;
90 ssize_t size;
91 gcs_act_type_t type;
92 gcs_seqno_t seqno;
93 gu_thread_t thread;
94
actionaction95 action() { }
actionaction96 action(const struct gu_buf* a_in,
97 void* a_out,
98 const void* a_local,
99 ssize_t a_size,
100 gcs_act_type_t a_type,
101 gcs_seqno_t a_seqno,
102 gu_thread_t a_thread)
103 :
104 in (a_in),
105 out (a_out),
106 local (a_local),
107 size (a_size),
108 type (a_type),
109 seqno (a_seqno),
110 thread (a_thread)
111 { }
112 } action_t;
113
114 //static struct action_t RecvAct;
115 static const ssize_t FRAG_SIZE = 4; // desirable action fragment size
116
117 // 1-fragment action
118 static const char act1_str[] = "101";
119 static const struct gu_buf act1[1] = {
120 { act1_str, sizeof(act1_str) }
121 };
122
123 // 2-fragment action, with buffers aligned with FRAG_SIZE
124 static const char act2_str[] = "202122";
125 static const struct gu_buf act2[2] = {
126 { "2021", 4 },
127 { "22", 3 } /* 4 + 3 = 7 = sizeof(act2_str) */
128 };
129
130 // 3-fragment action, with unaligned buffers
131 static const char act3_str[] = "3031323334";
132 static const struct gu_buf act3[] = {
133 { "303", 3 },
134 { "13", 2 },
135 { "23", 2 },
136 { "334", 4 } /* 3 + 2 + 2 + 4 = 11 = sizeof(act3_str) */
137 };
138
139
140 // action receive thread, returns after first action received, stores action
141 // in the passed action_t object, uses global Core to receive
142 static void*
core_recv_thread(void * arg)143 core_recv_thread (void* arg)
144 {
145 action_t* act = (action_t*)arg;
146
147 // @todo: refactor according to new gcs_act types
148 struct gcs_act_rcvd recv_act;
149
150 act->size = gcs_core_recv (Core, &recv_act, GU_TIME_ETERNITY);
151 act->out = (void*)recv_act.act.buf;
152 act->local = recv_act.local;
153 act->type = recv_act.act.type;
154 act->seqno = recv_act.id;
155
156 return (NULL);
157 }
158
159 // this macro logs errors from within a function
160 #define FAIL_IF(expr, format, ...) \
161 if (expr) { \
162 gu_fatal ("FAIL: " format, __VA_ARGS__); \
163 ck_assert_msg(false, format, __VA_ARGS__); \
164 return true; \
165 }
166
167 // Start a thread to receive an action
168 // args: action_t object
CORE_RECV_START(action_t * act)169 static inline bool CORE_RECV_START(action_t* act)
170 {
171 return (0 != gu_thread_create (&act->thread, NULL,
172 core_recv_thread, act));
173 }
174
COMMON_RECV_CHECKS(action_t * act,const char * buf,int size,gcs_act_type_t type,gcs_seqno_t * seqno)175 static bool COMMON_RECV_CHECKS(action_t* act,
176 const char* buf,
177 int size,
178 gcs_act_type_t type,
179 gcs_seqno_t* seqno)
180 {
181 FAIL_IF (size != UNKNOWN_SIZE && size != act->size,
182 "gcs_core_recv(): expected size %d, returned %zd (%s)",
183 size, act->size, strerror (-act->size));
184 FAIL_IF (act->type != type,
185 "type does not match: expected %d, got %d", type, act->type);
186 FAIL_IF (act->size > 0 && act->out == NULL,
187 "null buffer received with positive size: %zu", act->size);
188
189 if (act->type == GCS_ACT_STATE_REQ) return false;
190
191 // action is ordered only if it is of type GCS_ACT_WRITESET or
192 // GCS_ACT_CCHANGE and not an error
193 if (act->seqno > GCS_SEQNO_ILL) {
194 FAIL_IF (GCS_ACT_WRITESET != act->type && GCS_ACT_CCHANGE != act->type
195 && act->seqno > 0,
196 "GCS_ACT_WRITESET != act->type (%d), while act->seqno: %lld",
197 act->type, (long long)act->seqno);
198
199 if (GCS_ACT_WRITESET == act->type)
200 {
201 assert((*seqno + 1) == act->seqno);
202 FAIL_IF ((*seqno + 1) != act->seqno,
203 "expected seqno %lld, got %lld",
204 (long long)(*seqno + 1), (long long)act->seqno);
205 *seqno = *seqno + 1;
206 }
207 else if (GCS_ACT_CCHANGE == act->type)
208 {
209 FAIL_IF (act->seqno < 0, "Negative seqno: %lld",
210 (long long)act->seqno);
211
212 Uuid = gcs_core_get_group(Core)->group_uuid;
213
214 if (gcs_core_proto_ver(Core) >= 1) *seqno = *seqno + 1;
215 }
216 }
217
218 if (NULL != buf) {
219 if (GCS_ACT_WRITESET == act->type) {
220 // local action buffer should not be copied
221 ck_assert_msg(act->local == act->in,
222 "Received buffer ptr is not the same as sent: "
223 "%p != %p", act->in, act->local);
224 ck_assert_msg(!memcmp(buf, act->out, act->size),
225 "Received buffer contents is not the same as sent: "
226 "'%s' != '%s'", buf, (char*)act->out);
227 }
228 else {
229 ck_assert_msg(act->local != buf,
230 "Received the same buffer ptr as sent");
231 ck_assert_msg(!memcmp(buf, act->out, act->size),
232 "Received buffer contents is not the same as sent");
233 }
234 }
235
236 return false;
237 }
238
239 // Wait for recv thread to complete, perform required checks
240 // args: action_t, expected size, expected type
CORE_RECV_END(action_t * act,const void * buf,ssize_t size,gcs_act_type_t type)241 static bool CORE_RECV_END(action_t* act,
242 const void* buf,
243 ssize_t size,
244 gcs_act_type_t type)
245 {
246 {
247 int ret = gu_thread_join (act->thread, NULL);
248 act->thread = (gu_thread_t)-1;
249 FAIL_IF(0 != ret, "Failed to join recv thread: %d (%s)",
250 ret, strerror (ret));
251 }
252
253 return COMMON_RECV_CHECKS (act, (const char*)buf, size, type, &Seqno);
254 }
255
256 // Receive action in one call, perform required checks
257 // args: pointer to action_t, expected size, expected type
CORE_RECV_ACT(action_t * act,const void * buf,ssize_t size,gcs_act_type_t type)258 static bool CORE_RECV_ACT (action_t* act,
259 const void* buf, // single buffer action repres.
260 ssize_t size,
261 gcs_act_type_t type)
262 {
263 struct gcs_act_rcvd recv_act;
264
265 act->size = gcs_core_recv (Core, &recv_act, GU_TIME_ETERNITY);
266 act->out = (void*)recv_act.act.buf;
267 act->local = recv_act.local;
268 act->type = recv_act.act.type;
269 act->seqno = recv_act.id;
270
271 return COMMON_RECV_CHECKS (act, (const char*)buf, size, type, &Seqno);
272 }
273
274 // Sending always needs to be done via separate thread (uses lock-stepping)
275 void*
core_send_thread(void * arg)276 core_send_thread (void* arg)
277 {
278 action_t* act = (action_t*)arg;
279
280 // use seqno field to pass the return code, it is signed 8-byte integer
281 act->seqno = gcs_core_send (Core, act->in, act->size, act->type);
282
283 return (NULL);
284 }
285
286 // Start a thread to send an action
287 // args: action_t object
CORE_SEND_START(action_t * act)288 static bool CORE_SEND_START(action_t* act)
289 {
290 return (0 != gu_thread_create (&act->thread, NULL,
291 core_send_thread, act));
292 }
293
294 // Wait for send thread to complete, perform required checks
295 // args: action_t, expected return code
CORE_SEND_END(action_t * act,long ret)296 static bool CORE_SEND_END(action_t* act, long ret)
297 {
298 {
299 long _ret = gu_thread_join (act->thread, NULL);
300 act->thread = (gu_thread_t)-1;
301 ck_assert_msg(0 == _ret, "Failed to join recv thread: %ld (%s)",
302 _ret, strerror (_ret));
303 }
304
305 ck_assert_msg(ret == act->seqno,
306 "gcs_core_send(): expected %lld, returned %lld (%s)",
307 (long long) ret, (long long) act->seqno, strerror (-act->seqno));
308
309 return false;
310 }
311
312 // check if configuration is the one that we expected
313 static long
core_test_check_conf(const void * const conf_msg,int const conf_size,bool const prim,long const my_idx,size_t const memb_num)314 core_test_check_conf (const void* const conf_msg, int const conf_size,
315 bool const prim, long const my_idx, size_t const memb_num)
316 {
317 long ret = 0;
318
319 gcs_act_cchange const conf(conf_msg, conf_size);
320
321 if ((conf.conf_id >= 0) != prim) {
322 gu_error ("Expected %s conf, received %s",
323 prim ? "PRIMARY" : "NON-PRIMARY",
324 (conf.conf_id >= 0) ? "PRIMARY" : "NON-PRIMARY");
325 ret = -1;
326 }
327
328 if (conf.memb.size() != memb_num) {
329 gu_error ("Expected memb_num = %zd, got %zd", memb_num,conf.memb.size());
330 ret = -1;
331 }
332
333 return ret;
334 }
335
336 static long
core_test_set_payload_size(ssize_t s)337 core_test_set_payload_size (ssize_t s)
338 {
339 long ret;
340 const ssize_t arbitrary_pkt_size = s + 64; // big enough for payload to fit
341
342 ret = gcs_core_set_pkt_size (Core, arbitrary_pkt_size);
343 if (ret <= 0) {
344 gu_error("set_pkt_size(%zd) returned: %ld (%s)", arbitrary_pkt_size,
345 ret, strerror (-ret));
346 return ret;
347 }
348
349 ret = gcs_core_set_pkt_size (Core, arbitrary_pkt_size - ret + s);
350 if (ret != s) {
351 gu_error("set_pkt_size() returned: %ld instead of %zd", ret, s);
352 return ret;
353 }
354
355 return 0;
356 }
357
358 // Initialises core and backend objects + some common tests
359 static inline void
core_test_init(gu::Config * config,bool bootstrap=true,int const gcs_proto_ver=1)360 core_test_init (gu::Config* config,
361 bool bootstrap = true, int const gcs_proto_ver = 1)
362 {
363 long ret;
364 action_t act;
365
366 mark_point();
367
368 ck_assert(config != NULL);
369
370 gcs_test::InitConfig(*config, CacheName);
371
372 Cache = new gcache::GCache(*config, ".");
373
374 Core = gcs_core_create (reinterpret_cast<gu_config_t*>(config),
375 reinterpret_cast<gcache_t*>(Cache),
376 "core_test", "aaa.bbb.ccc.ddd:xxxx", 0, 0,
377 gcs_proto_ver);
378
379 ck_assert(NULL != Core);
380
381 Backend = gcs_core_get_backend (Core);
382 ck_assert(NULL != Backend);
383
384 Seqno = 0; // reset seqno
385
386 ret = core_test_set_payload_size (FRAG_SIZE);
387 ck_assert_msg(-EBADFD == ret, "Expected -EBADFD, got: %ld (%s)",
388 ret, strerror(-ret));
389
390 ret = gcs_core_open (Core, "yadda-yadda", "owkmevc", 1);
391 ck_assert_msg(-EINVAL == ret, "Expected -EINVAL, got %ld (%s)",
392 ret, strerror(-ret));
393
394 ret = gcs_core_open (Core, "yadda-yadda", "dummy://", bootstrap);
395 ck_assert_msg(0 == ret, "Failed to open core connection: %ld (%s)",
396 ret, strerror(-ret));
397
398 if (!bootstrap) {
399 gcs_core_send_lock_step (Core, true);
400 mark_point();
401 return;
402 }
403
404 // receive first configuration message
405 ck_assert(!CORE_RECV_ACT (&act, NULL, UNKNOWN_SIZE, GCS_ACT_CCHANGE));
406 ck_assert(!core_test_check_conf(act.out, act.size, bootstrap, 0, 1));
407 Cache->free(act.out);
408
409 int const ver(gcs_core_proto_ver(Core));
410 ck_assert_msg(ver == gcs_proto_ver,"Expected protocol version: %d, got: %d",
411 gcs_proto_ver, ver);
412
413 // this will configure backend to have desired fragment size
414 ret = core_test_set_payload_size (FRAG_SIZE);
415 ck_assert_msg(0 == ret, "Failed to set up the message payload size: %ld (%s)",
416 ret, strerror(-ret));
417
418 // try to send an action to check that everything's alright
419 ret = gcs_core_send (Core, act1, sizeof(act1_str), GCS_ACT_WRITESET);
420 ck_assert_msg(ret == sizeof(act1_str), "Expected %zu, got %ld (%s)",
421 sizeof(act1_str), ret, strerror (-ret));
422 gu_warn ("Next CORE_RECV_ACT fails under valgrind");
423 act.in = act1;
424 ck_assert(!CORE_RECV_ACT(&act, act1_str, sizeof(act1_str), GCS_ACT_WRITESET));
425
426 ret = gcs_core_send_join (Core, gu::GTID(Uuid, Seqno), 0);
427 ck_assert_msg(ret >= 0, "gcs_core_send_join(): %ld (%s)",
428 ret, strerror(-ret));
429 // no action to be received (we're joined already)
430
431 ret = gcs_core_send_sync (Core, gu::GTID(Uuid, Seqno));
432
433 int const proto(gcs_core_proto_ver(Core));
434 ck_assert(proto == gcs_proto_ver); // checking just in case
435
436 int const expected_ret
437 (proto >= 1 ? gcs::core::CodeMsg::serial_size() : sizeof(gcs_seqno_t));
438 ck_assert_msg(ret == expected_ret,
439 "gcs_core_send_sync(): %ld (%s)", ret, strerror(-ret));
440
441 ck_assert(!CORE_RECV_ACT(&act, NULL, sizeof(gcs_seqno_t), GCS_ACT_SYNC));
442
443 gcs_seqno_t const s(gcs_seqno_gtoh(*(gcs_seqno_t*)act.out));
444
445 int const expected_s(proto >= 1 ? 0 : Seqno);
446 ck_assert_msg(s == expected_s, "Expected code %lld, got %lld",
447 (long long)expected_s, (long long)s);
448
449 gcs_core_send_lock_step (Core, true);
450 mark_point();
451 }
452
453 // cleans up core and backend objects
454 static inline void
core_test_cleanup()455 core_test_cleanup ()
456 {
457 long ret;
458 char tmp[1];
459 action_t act;
460
461 ck_assert(NULL != Core);
462 ck_assert(NULL != Backend);
463
464 // to fetch self-leave message
465 ck_assert(!CORE_RECV_START (&act));
466 ret = gcs_core_close (Core);
467 ck_assert_msg(0 == ret, "Failed to close core: %ld (%s)",
468 ret, strerror (-ret));
469 ret = CORE_RECV_END (&act, NULL, UNKNOWN_SIZE, GCS_ACT_CCHANGE);
470 ck_assert_msg(0 == ret, "ret: %ld (%s)", ret, strerror(-ret));
471 Cache->free(act.out);
472
473 // check that backend is closed too
474 ret = Backend->send (Backend, tmp, sizeof(tmp), GCS_MSG_ACTION);
475 ck_assert(ret == -EBADFD);
476
477 ret = gcs_core_destroy (Core);
478 ck_assert_msg(0 == ret, "Failed to destroy core: %ld (%s)",
479 ret, strerror (-ret));
480
481 {
482 ssize_t allocated;
483 allocated = gcs_tests_get_allocated();
484 ck_assert_msg(0 == allocated,
485 "Expected 0 allocated bytes, found %zd", allocated);
486 }
487
488 delete Cache;
489 ::unlink(CacheName.c_str());
490 }
491
492 // just a smoke test for core API
START_TEST(gcs_core_test_api)493 START_TEST (gcs_core_test_api)
494 {
495 gu::Config config;
496 core_test_init (&config);
497
498 ck_assert(NULL != Cache);
499 ck_assert(NULL != Core);
500 ck_assert(NULL != Backend);
501
502 long ret;
503 long tout = 100; // 100 ms timeout
504 const struct gu_buf* act = act3;
505 const void* act_buf = act3_str;
506 size_t act_size = sizeof(act3_str);
507
508 action_t act_s(act, NULL, NULL, act_size, GCS_ACT_WRITESET, -1, (gu_thread_t)-1);
509 action_t act_r(act, NULL, NULL, -1, (gcs_act_type_t)-1, -1, (gu_thread_t)-1);
510 long i = 5;
511
512 // test basic fragmentaiton
513 while (i--) {
514 long frags = (act_size - 1)/FRAG_SIZE + 1;
515
516 gu_info ("Iteration %ld: act: %s, size: %zu, frags: %ld",
517 i, act, act_size, frags);
518
519 ck_assert(!CORE_SEND_START (&act_s));
520
521 while ((ret = gcs_core_send_step (Core, 3*tout)) > 0) {
522 frags--; gu_info ("frags: %ld", frags);
523 // usleep (1000);
524 }
525
526 ck_assert_msg(ret == 0, "gcs_core_send_step() returned: %ld (%s)",
527 ret, strerror(-ret));
528 ck_assert_msg(frags == 0, "frags = %ld, instead of 0", frags);
529 ck_assert(!CORE_SEND_END (&act_s, act_size));
530 ck_assert(!CORE_RECV_ACT (&act_r, act_buf, act_size, GCS_ACT_WRITESET));
531
532 ret = gcs_core_set_last_applied (Core, gu::GTID(Uuid, Seqno));
533 ck_assert_msg(ret >= 0, "gcs_core_set_last_applied(): %ld (%s)",
534 ret, strerror(-ret));
535 /* commit cut action size should be 8 */
536 ck_assert(!CORE_RECV_ACT (&act_r, NULL, 8, GCS_ACT_COMMIT_CUT));
537 ck_assert(Seqno == gcs_seqno_gtoh(*(gcs_seqno_t*)act_r.out));
538 free(act_r.out); // commit cut is allocated by malloc()
539 }
540
541 // send fake flow control action, its contents is not important
542 gcs_core_send_fc (Core, act, act_size);
543 ck_assert_msg(ret >= 0, "gcs_core_send_fc(): %ld (%s)",
544 ret, strerror(-ret));
545 ck_assert(!CORE_RECV_ACT(&act_r, act, act_size, GCS_ACT_FLOW));
546
547 core_test_cleanup ();
548 }
549 END_TEST
550
551 // do a single send step, compare with the expected result
552 static inline bool
CORE_SEND_STEP(gcs_core_t * core,long timeout,long ret)553 CORE_SEND_STEP (gcs_core_t* core, long timeout, long ret)
554 {
555 long err = gcs_core_send_step (core, timeout);
556 ck_assert_msg(err >= 0, "gcs_core_send_step(): %ld (%s)",
557 err, strerror (-err));
558 if (ret >= 0) {
559 ck_assert_msg(err == ret, "gcs_core_send_step(): expected %ld, got %ld",
560 ret, err);
561 }
562
563 return false;
564 }
565
566 static bool
DUMMY_INJECT_COMPONENT(gcs_backend_t * backend,const gcs_comp_msg_t * comp)567 DUMMY_INJECT_COMPONENT (gcs_backend_t* backend, const gcs_comp_msg_t* comp)
568 {
569 long ret = gcs_dummy_inject_msg (Backend, comp,
570 gcs_comp_msg_size(comp),
571 GCS_MSG_COMPONENT, GCS_SENDER_NONE);
572 ck_assert_msg(ret > 0, "gcs_dummy_inject_msg(): %ld (%s)",
573 ret, strerror(ret));
574
575 return false;
576 }
577
578 static bool
DUMMY_INSTALL_COMPONENT(gcs_backend_t * backend,const gcs_comp_msg_t * comp)579 DUMMY_INSTALL_COMPONENT (gcs_backend_t* backend, const gcs_comp_msg_t* comp)
580 {
581 bool primary = gcs_comp_msg_primary (comp);
582 long my_idx = gcs_comp_msg_self (comp);
583 long members = gcs_comp_msg_num (comp);
584
585 action_t act;
586
587 FAIL_IF (gcs_dummy_set_component(Backend, comp), "%s",
588 "gcs_dummy_set_component");
589 FAIL_IF (DUMMY_INJECT_COMPONENT (Backend, comp), "%s",
590 "DUMMT_INJECT_COMPONENT");
591 FAIL_IF (CORE_RECV_ACT (&act, NULL, UNKNOWN_SIZE, GCS_ACT_CCHANGE),
592 "%s", "CORE_RECV_ACT");
593 FAIL_IF (core_test_check_conf(act.out, act.size, primary, my_idx, members),
594 "%s", "core_test_check_conf");
595 Cache->free(act.out);
596 return false;
597 }
598
599 static void
CORE_TEST_OWN(int gcs_proto_ver)600 CORE_TEST_OWN (int gcs_proto_ver)
601 {
602 long const tout = 1000; // 100 ms timeout
603
604 const struct gu_buf* act = act2;
605 const void* act_buf = act2_str;
606 size_t act_size = sizeof(act2_str);
607
608 action_t act_s(act, NULL, NULL, act_size, GCS_ACT_WRITESET, -1, (gu_thread_t)-1);
609 action_t act_r(act, NULL, NULL, -1, (gcs_act_type_t)-1, -1, (gu_thread_t)-1);
610
611 // Create primary and non-primary component messages
612 gcs_comp_msg_t* prim = gcs_comp_msg_new (true, false, 0, 1, 0);
613 gcs_comp_msg_t* non_prim = gcs_comp_msg_new (false, false, 0, 1, 0);
614 ck_assert(NULL != prim);
615 ck_assert(NULL != non_prim);
616 gcs_comp_msg_add (prim, "node1", 0);
617 gcs_comp_msg_add (non_prim, "node1", 1);
618
619 gu::Config config;
620 core_test_init (&config, true, gcs_proto_ver);
621
622 /////////////////////////////////////////////
623 /// check behaviour in transitional state ///
624 /////////////////////////////////////////////
625
626 ck_assert(!CORE_RECV_START (&act_r));
627 ck_assert(!CORE_SEND_START (&act_s));
628 ck_assert(!CORE_SEND_STEP (Core, tout, 1)); // 1st frag
629 usleep (10000); // resolve race between sending and setting transitional
630 gcs_dummy_set_transitional (Backend);
631 ck_assert(!CORE_SEND_STEP (Core, tout, 1)); // 2nd frag
632 ck_assert(!CORE_SEND_STEP (Core, tout, 0)); // no frags left
633 ck_assert(NULL == act_r.out); // should not have received anything
634 ck_assert(!gcs_dummy_set_component (Backend, prim)); // return to PRIM state
635 ck_assert(!CORE_SEND_END (&act_s, act_size));
636 ck_assert(!CORE_RECV_END (&act_r, act_buf, act_size, GCS_ACT_WRITESET));
637
638 /*
639 * TEST CASE 1: Action was sent successfully, but NON_PRIM component
640 * happened before any fragment could be delivered.
641 * EXPECTED OUTCOME: action is received with -ENOTCONN instead of global
642 * seqno
643 */
644 ck_assert(!DUMMY_INJECT_COMPONENT (Backend, non_prim));
645 ck_assert(!CORE_SEND_START (&act_s));
646 ck_assert(!CORE_SEND_STEP (Core, tout, 1)); // 1st frag
647 ck_assert(!CORE_SEND_STEP (Core, tout, 1)); // 2nd frag
648 ck_assert(!CORE_SEND_END (&act_s, act_size));
649 ck_assert(!gcs_dummy_set_component(Backend, non_prim));
650 ck_assert(!CORE_RECV_ACT (&act_r, NULL, UNKNOWN_SIZE, GCS_ACT_CCHANGE));
651 ck_assert(!core_test_check_conf(act_r.out, act_r.size, false, 0, 1));
652 Cache->free(act_r.out);
653 ck_assert(!CORE_RECV_ACT (&act_r, act_buf, act_size, GCS_ACT_WRITESET));
654 ck_assert_msg(-ENOTCONN == act_r.seqno,
655 "Expected -ENOTCONN, received %ld (%s)",
656 act_r.seqno, strerror (-act_r.seqno));
657
658 /*
659 * TEST CASE 2: core in NON_PRIM state. There is attempt to send an
660 * action.
661 * EXPECTED OUTCOME: CORE_SEND_END should return -ENOTCONN after 1st
662 * fragment send fails.
663 */
664 ck_assert(!CORE_SEND_START (&act_s));
665 ck_assert(!CORE_SEND_STEP (Core, tout, 1)); // 1st frag
666 ck_assert(!CORE_SEND_STEP (Core, tout, 0)); // bail out after 1st frag
667 ck_assert(!CORE_SEND_END (&act_s, -ENOTCONN));
668
669 /*
670 * TEST CASE 3: Backend in NON_PRIM state. There is attempt to send an
671 * action.
672 * EXPECTED OUTCOME: CORE_SEND_END should return -ENOTCONN after 1st
673 * fragment send fails.
674 */
675 ck_assert(!DUMMY_INSTALL_COMPONENT (Backend, prim));
676 ck_assert(!gcs_dummy_set_component(Backend, non_prim));
677 ck_assert(!DUMMY_INJECT_COMPONENT (Backend, non_prim));
678 ck_assert(!CORE_SEND_START (&act_s));
679 ck_assert(!CORE_SEND_STEP (Core, tout, 1)); // 1st frag
680 ck_assert(!CORE_SEND_END (&act_s, -ENOTCONN));
681 ck_assert(!CORE_RECV_ACT (&act_r, NULL, UNKNOWN_SIZE, GCS_ACT_CCHANGE));
682 ck_assert(!core_test_check_conf(act_r.out, act_r.size, false, 0, 1));
683 Cache->free(act_r.out);
684
685 /*
686 * TEST CASE 4: Action was sent successfully, but NON_PRIM component
687 * happened in between delivered fragments.
688 * EXPECTED OUTCOME: action is received with -ENOTCONN instead of global
689 * seqno.
690 */
691 ck_assert(!DUMMY_INSTALL_COMPONENT (Backend, prim));
692 ck_assert(!CORE_SEND_START (&act_s));
693 ck_assert(!CORE_SEND_STEP (Core, tout, 1)); // 1st frag
694 ck_assert(!DUMMY_INJECT_COMPONENT (Backend, non_prim));
695 ck_assert(!CORE_SEND_STEP (Core, tout, 1)); // 2nd frag
696 ck_assert(!CORE_SEND_END (&act_s, act_size));
697 ck_assert(!CORE_RECV_ACT (&act_r, NULL, UNKNOWN_SIZE, GCS_ACT_CCHANGE));
698 ck_assert(!core_test_check_conf(act_r.out, act_r.size, false, 0, 1));
699 Cache->free(act_r.out);
700 ck_assert(!CORE_RECV_ACT (&act_r, act_buf, act_size, GCS_ACT_WRITESET));
701 ck_assert_msg(-ENOTCONN == act_r.seqno,
702 "Expected -ENOTCONN, received %ld (%s)",
703 act_r.seqno, strerror (-act_r.seqno));
704
705 /*
706 * TEST CASE 5: Action is being sent and received concurrently. In between
707 * two fragments recv thread receives NON_PRIM and then PRIM components.
708 * EXPECTED OUTCOME: CORE_RECV_ACT should receive the action with -ERESTART
709 * instead of seqno.
710 */
711 ck_assert(!DUMMY_INSTALL_COMPONENT (Backend, prim));
712 ck_assert(!CORE_SEND_START (&act_s));
713 ck_assert(!CORE_SEND_STEP (Core, tout, 1)); // 1st frag
714 usleep (100000); // make sure 1st fragment gets in before new component
715 ck_assert(!DUMMY_INSTALL_COMPONENT (Backend, non_prim));
716 ck_assert(!DUMMY_INSTALL_COMPONENT (Backend, prim));
717 ck_assert(!CORE_SEND_STEP (Core, tout, 1)); // 2nd frag
718 ck_assert(!CORE_SEND_END (&act_s, act_size));
719 ck_assert(!CORE_RECV_ACT (&act_r, act_buf, act_size, GCS_ACT_WRITESET));
720 ck_assert_msg(-ERESTART == act_r.seqno,
721 "Expected -ERESTART, received %ld (%s)",
722 act_r.seqno, strerror (-act_r.seqno));
723
724 /*
725 * TEST CASE 6: Action has 3 fragments, 2 were sent successfully but the
726 * 3rd failed because backend is in NON_PRIM. In addition NON_PRIM component
727 * happened in between delivered fragments.
728 * subcase 1: new component received first
729 * subcase 2: 3rd fragment is sent first
730 * EXPECTED OUTCOME: CORE_SEND_END should return -ENOTCONN after 3rd
731 * fragment send fails.
732 */
733 act = act3;
734 act_buf = act3_str;
735 act_size = sizeof(act3_str);
736 act_s.in = act;
737 act_s.size = act_size;
738
739 // subcase 1
740 ck_assert(!DUMMY_INSTALL_COMPONENT (Backend, prim));
741 ck_assert(!CORE_SEND_START (&act_s));
742 ck_assert(!CORE_SEND_STEP (Core, tout, 1)); // 1st frag
743 ck_assert(!DUMMY_INJECT_COMPONENT (Backend, non_prim));
744 ck_assert(!CORE_SEND_STEP (Core, tout, 1)); // 2nd frag
745 usleep (500000); // fail_if_seq
746 ck_assert(!gcs_dummy_set_component(Backend, non_prim));
747 ck_assert(!CORE_RECV_ACT (&act_r, NULL, UNKNOWN_SIZE, GCS_ACT_CCHANGE));
748 ck_assert(!core_test_check_conf(act_r.out, act_r.size, false, 0, 1));
749 Cache->free(act_r.out);
750 ck_assert(!CORE_SEND_STEP (Core, tout, 1)); // 3rd frag
751 ck_assert(!CORE_SEND_END (&act_s, -ENOTCONN));
752
753 // subcase 2
754 ck_assert(!DUMMY_INSTALL_COMPONENT (Backend, prim));
755 ck_assert(!CORE_SEND_START (&act_s));
756 ck_assert(!CORE_SEND_STEP (Core, tout, 1)); // 1st frag
757 ck_assert(!DUMMY_INJECT_COMPONENT (Backend, non_prim));
758 ck_assert(!CORE_SEND_STEP (Core, tout, 1)); // 2nd frag
759 usleep (1000000);
760 ck_assert(!gcs_dummy_set_component(Backend, non_prim));
761 ck_assert(!CORE_SEND_STEP (Core, 4*tout, 1)); // 3rd frag
762 ck_assert(!CORE_RECV_ACT (&act_r, NULL, UNKNOWN_SIZE, GCS_ACT_CCHANGE));
763 ck_assert(!core_test_check_conf(act_r.out, act_r.size, false, 0, 1));
764 Cache->free(act_r.out);
765 ck_assert(!CORE_SEND_END (&act_s, -ENOTCONN));
766
767 gu_free (prim);
768 gu_free (non_prim);
769
770 core_test_cleanup ();
771 }
772
START_TEST(gcs_core_test_own_v0)773 START_TEST (gcs_core_test_own_v0)
774 {
775 CORE_TEST_OWN(0);
776 }
777 END_TEST
778
START_TEST(gcs_core_test_own_v1)779 START_TEST (gcs_core_test_own_v1)
780 {
781 CORE_TEST_OWN(1);
782 }
783 END_TEST
784
785 #ifdef GCS_ALLOW_GH74
786 /*
787 * Disabled test because it is too slow and timeouts on crowded
788 * build systems like e.g. build.opensuse.org */
789
START_TEST(gcs_core_test_gh74)790 START_TEST (gcs_core_test_gh74)
791 {
792 gu::Config config;
793 core_test_init(&config, true, "node1");
794
795 // set frag size large enough to avoid fragmentation.
796 gu_info ("set payload size = 1024");
797 core_test_set_payload_size(1024);
798
799 // new primary comp message.
800 gcs_comp_msg_t* prim = gcs_comp_msg_new (true, false, 0, 2, 0);
801 ck_assert(NULL != prim);
802 gcs_comp_msg_add(prim, "node1", 0);
803 gcs_comp_msg_add(prim, "node2", 1);
804
805 // construct state transform request.
806 static const char* req_ptr = "12345";
807 static const size_t req_size = 6;
808 static const char* donor = ""; // from *any*
809 static const size_t donor_len = strlen(donor) + 1;
810 size_t act_size = req_size + donor_len;
811 char* act_ptr = 0;
812
813 act_ptr = (char*)gu_malloc(act_size);
814 memcpy(act_ptr, donor, donor_len);
815 memcpy(act_ptr + donor_len, req_ptr, req_size);
816
817 // serialize request into message.
818 gcs_act_frag_t frg;
819 frg.proto_ver = gcs_core_group_protocol_version(Core);
820 frg.frag_no = 0;
821 frg.act_id = 1;
822 frg.act_size = act_size;
823 frg.act_type = GCS_ACT_STATE_REQ;
824 char msg_buf[1024];
825 ck_assert(!gcs_act_proto_write(&frg, msg_buf, sizeof(msg_buf)));
826 memcpy(const_cast<void*>(frg.frag), act_ptr, act_size);
827 size_t msg_size = act_size + gcs_act_proto_hdr_size(frg.proto_ver);
828 // gu_free(act_ptr);
829
830 // state exchange message.
831 gu_uuid_t state_uuid;
832 gu_uuid_generate(&state_uuid, NULL, 0);
833 gcs_core_set_state_uuid(Core, &state_uuid);
834
835 // construct uuid message from node1.
836 size_t uuid_len = sizeof(state_uuid);
837 char uuid_buf[uuid_len];
838 memcpy(uuid_buf, &state_uuid, uuid_len);
839
840 gcs_state_msg_t* state_msg = NULL;
841 const gcs_group_t* group = gcs_core_get_group(Core);
842
843 // state exchange message from node1
844 state_msg = gcs_group_get_state(group);
845 state_msg->state_uuid = state_uuid;
846 size_t state_len = gcs_state_msg_len (state_msg);
847 char state_buf[state_len];
848 gcs_state_msg_write (state_buf, state_msg);
849 gcs_state_msg_destroy (state_msg);
850
851 // state exchange message from node2
852 state_msg = gcs_state_msg_create(&state_uuid,
853 &GU_UUID_NIL,
854 &GU_UUID_NIL,
855 GCS_SEQNO_ILL,
856 GCS_SEQNO_ILL,
857 GCS_SEQNO_ILL,
858 0,
859 GCS_NODE_STATE_NON_PRIM,
860 GCS_NODE_STATE_PRIM,
861 "node2", "127.0.0.1",
862 group->gcs_proto_ver,
863 group->repl_proto_ver,
864 group->appl_proto_ver,
865 group->prim_gcs_ver,
866 group->prim_repl_ver,
867 group->prim_appl_ver,
868 0, // desync count
869 0);
870 size_t state_len2 = gcs_state_msg_len (state_msg);
871 char state_buf2[state_len2];
872 gcs_state_msg_write (state_buf2, state_msg);
873 gcs_state_msg_destroy (state_msg);
874
875 action_t act_r(NULL, NULL, NULL, -1, (gcs_act_type_t)-1, -1, (gu_thread_t)-1);
876
877 // ========== from node1's view ==========
878 ck_assert(!gcs_dummy_set_component(Backend, prim));
879 ck_assert(!DUMMY_INJECT_COMPONENT(Backend, prim));
880 gu_free(prim);
881 CORE_RECV_START(&act_r); // we have to start another thread here.
882 // otherwise messages to node1 can not be in right order.
883 for(;;) {
884 usleep(10000); // make sure node1 already changed its status to WAIT_STATE_MSG
885 if (gcs_group_state(group) == GCS_GROUP_WAIT_STATE_MSG) {
886 break;
887 }
888 }
889 // then STR sneaks before new configuration is delivered.
890 ck_assert(gcs_dummy_inject_msg(Backend, msg_buf, msg_size,
891 GCS_MSG_ACTION, 1) == (int)msg_size);
892 // then state exchange message from node2.
893 ck_assert(gcs_dummy_inject_msg(Backend, state_buf2, state_len2,
894 GCS_MSG_STATE_MSG, 1) == (int)state_len2);
895 // expect STR is lost here.
896 ck_assert(!CORE_RECV_END(&act_r, NULL, UNKNOWN_SIZE, GCS_ACT_CCHANGE));
897 ck_assert(!core_test_check_conf((const gcs_act_cchange_t*)act_r.out, true, 0, 2));
898 free(act_r.out);
899 core_test_cleanup();
900
901 // ========== from node2's view ==========
902 core_test_init(&config, false, "node2");
903
904 // set frag size large enough to avoid fragmentation.
905 gu_info ("set payload size = 1024");
906 core_test_set_payload_size(1024);
907
908 prim = gcs_comp_msg_new (true, false, 1, 2, 0);
909 ck_assert(NULL != prim);
910 gcs_comp_msg_add(prim, "node1", 0);
911 gcs_comp_msg_add(prim, "node2", 1);
912
913 // node1 and node2 joins.
914 // now node2's status == GCS_NODE_STATE_PRIM
915 ck_assert(!gcs_dummy_set_component(Backend, prim));
916 ck_assert(!DUMMY_INJECT_COMPONENT(Backend, prim));
917 gu_free(prim);
918 ck_assert(gcs_dummy_inject_msg(Backend, uuid_buf, uuid_len,
919 GCS_MSG_STATE_UUID, 0) == (int)uuid_len);
920 ck_assert(gcs_dummy_inject_msg(Backend, state_buf, state_len,
921 GCS_MSG_STATE_MSG, 0) == (int)state_len);
922 ck_assert(!CORE_RECV_ACT(&act_r, NULL, UNKNOWN_SIZE, GCS_ACT_CCHANGE));
923 ck_assert(!core_test_check_conf((const gcs_act_cchange_t*)act_r.out, true, 1, 2));
924 free(act_r.out);
925
926 // then node3 joins.
927 prim = gcs_comp_msg_new (true, false, 1, 3, 0);
928 ck_assert(NULL != prim);
929 gcs_comp_msg_add(prim, "node1", 0);
930 gcs_comp_msg_add(prim, "node2", 1);
931 gcs_comp_msg_add(prim, "node3", 2);
932 ck_assert(!gcs_dummy_set_component(Backend, prim));
933 ck_assert(!DUMMY_INJECT_COMPONENT(Backend, prim));
934 gu_free(prim);
935
936 // generate a new state uuid.
937 gu_uuid_generate(&state_uuid, NULL, 0);
938 memcpy(uuid_buf, &state_uuid, uuid_len);
939
940 // state exchange message from node3
941 group = gcs_core_get_group(Core);
942 state_msg = gcs_state_msg_create(&state_uuid,
943 &GU_UUID_NIL,
944 &GU_UUID_NIL,
945 GCS_SEQNO_ILL,
946 GCS_SEQNO_ILL,
947 GCS_SEQNO_ILL,
948 0,
949 GCS_NODE_STATE_NON_PRIM,
950 GCS_NODE_STATE_PRIM,
951 "node3", "127.0.0.1",
952 group->gcs_proto_ver,
953 group->repl_proto_ver,
954 group->appl_proto_ver,
955 group->prim_gcs_ver,
956 group->prim_repl_ver,
957 group->prim_appl_ver,
958 0, // desync count
959 0);
960 size_t state_len3 = gcs_state_msg_len (state_msg);
961 char state_buf3[state_len3];
962 gcs_state_msg_write (state_buf3, state_msg);
963 gcs_state_msg_destroy (state_msg);
964
965 // updating state message from node1.
966 group = gcs_core_get_group(Core);
967 state_msg = gcs_group_get_state(group);
968 state_msg->flags = GCS_STATE_FREP | GCS_STATE_FCLA;
969 state_msg->prim_state = GCS_NODE_STATE_JOINED;
970 state_msg->current_state = GCS_NODE_STATE_SYNCED;
971 state_msg->state_uuid = state_uuid;
972 state_msg->name = "node1";
973 gcs_state_msg_write(state_buf, state_msg);
974 gcs_state_msg_destroy(state_msg);
975
976 ck_assert(gcs_dummy_inject_msg(Backend, uuid_buf, uuid_len,
977 GCS_MSG_STATE_UUID, 0) == (int)uuid_len);
978 ck_assert(gcs_dummy_inject_msg(Backend, state_buf, state_len,
979 GCS_MSG_STATE_MSG, 0) == (int)state_len);
980
981 // STR sneaks.
982 // we have to make same message exists in sender queue too.
983 // otherwise we will get following log
984 // "FIFO violation: queue empty when local action received"
985 const struct gu_buf act = {act_ptr, (ssize_t)act_size};
986 action_t act_s(&act, NULL, NULL, act_size, GCS_ACT_STATE_REQ, -1, (gu_thread_t)-1);
987 CORE_SEND_START(&act_s);
988 for(;;) {
989 usleep(10000);
990 gcs_fifo_lite_t* fifo = gcs_core_get_fifo(Core);
991 void* item = gcs_fifo_lite_get_head(fifo);
992 if (item) {
993 gcs_fifo_lite_release(fifo);
994 break;
995 }
996 }
997 ck_assert(gcs_dummy_inject_msg(Backend, msg_buf, msg_size,
998 GCS_MSG_ACTION, 1) == (int)msg_size);
999
1000 ck_assert(gcs_dummy_inject_msg(Backend, state_buf3, state_len3,
1001 GCS_MSG_STATE_MSG, 2) == (int)state_len3);
1002
1003 // expect STR and id == -EAGAIN.
1004 ck_assert(!CORE_RECV_ACT(&act_r, act_ptr, act_size, GCS_ACT_STATE_REQ));
1005 ck_assert(act_r.seqno == -EAGAIN);
1006 free(act_r.out);
1007
1008 ck_assert(!CORE_RECV_ACT(&act_r, NULL, UNKNOWN_SIZE, GCS_ACT_CCHANGE));
1009 ck_assert(!core_test_check_conf((const gcs_act_cchange_t*)act_r.out, true, 1, 3));
1010 free(act_r.out);
1011
1012 // core_test_cleanup();
1013 // ==========
1014 gu_free(act_ptr);
1015 }
1016 END_TEST
1017 #endif /* GCS_ALLOW_GH74 */
1018
1019
1020 #if 0 // requires multinode support from gcs_dummy
1021 START_TEST (gcs_core_test_foreign)
1022 {
1023 core_test_init ();
1024
1025 core_test_cleanup ();
1026 }
1027 END_TEST
1028 #endif // 0
1029
gcs_core_suite(void)1030 Suite *gcs_core_suite(void)
1031 {
1032 Suite *suite = suite_create("GCS core context");
1033 TCase *tcase = tcase_create("gcs_core");
1034
1035 suite_add_tcase (suite, tcase);
1036 tcase_set_timeout(tcase, 60);
1037 bool skip = false;
1038
1039 if (skip == false) {
1040 tcase_add_test (tcase, gcs_code_msg);
1041 tcase_add_test (tcase, gcs_core_test_api);
1042 tcase_add_test (tcase, gcs_core_test_own_v0);
1043 tcase_add_test (tcase, gcs_core_test_own_v1);
1044 #ifdef GCS_ALLOW_GH74
1045 tcase_add_test (tcase, gcs_core_test_gh74);
1046 #endif /* GCS_ALLOW_GH74 */
1047 // tcase_add_test (tcase, gcs_core_test_foreign);
1048 }
1049 return suite;
1050 }
1051