1 /*
2  * Copyright (C) 2008-2020 Codership Oy <info@codership.com>
3  *
4  * $Id$
5  */
6 
7 /*
8  * @file
9  *
10  * Defines unit tests for gcs_core (and as a result tests gcs_group and
11  * a dummy backend which gcs_core depends on)
12  *
13  * Most of the checks require independent sending and receiving threads.
14  * Approach 1 is to start separate threads for both sending and receiving
15  * and use the current thread of execution to sychronize between them:
16  *
17  * CORE_RECV_START(act_r)
18  * CORE_SEND_START(act_s)
19  * while (gcs_core_send_step(Core)) {  // step through action fragments
20  *     (do something)
21  * };
22  * CORE_SEND_END(act_s, ret)          // check return code
23  * CORE_RECV_END(act_r, size, type)   // makes checks against size and type
24  *
25  * A simplified approach 2 is:
26  *
27  * CORE_SEND_START(act_s)
28  * while (gcs_core_send_step(Core)) {  // step through action fragments
29  *     (do something)
30  * };
31  * CORE_SEND_END(act_s, ret)          // check return code
32  * CORE_RECV_ACT(act_r, size, type)   // makes checks agains size and type
33  *
34  * In the first approach group messages will be received concurrently.
35  * In the second approach messages will wait in queue and be fetched afterwards
36  *
37  */
38 
39 #define GCS_STATE_MSG_ACCESS
40 #include "../gcs_core.hpp"
41 #include "../gcs_dummy.hpp"
42 #include "../gcs_seqno.hpp"
43 #include "../gcs_state_msg.hpp"
44 #include "../gcs_code_msg.hpp"
45 
46 #include <galerautils.h>
47 #include "gu_config.hpp"
48 
49 #include "gcs_test_utils.hpp"
50 #include "gcs_core_test.hpp" // must be included last
51 
START_TEST(gcs_code_msg)52 START_TEST(gcs_code_msg)
53 {
54     gu::UUID    const u0(NULL, 0);
55     gcs_seqno_t const s0(1234);
56     uint64_t    const c0(4312);
57 
58     gcs::core::CodeMsg cm0(gu::GTID(u0, s0), c0);
59 
60     const void* const buf(cm0());
61 
62     const gcs::core::CodeMsg* const cm1
63         (static_cast<const gcs::core::CodeMsg*>(buf));
64 
65     gu::UUID    const u1(cm1->uuid());
66     gcs_seqno_t const s1(cm1->seqno());
67     uint64_t    const c1(cm1->code());
68 
69     ck_assert(u0 == u1);
70     ck_assert(s0 == s1);
71     ck_assert(c0 == c1);
72 }
73 END_TEST
74 
75 extern ssize_t gcs_tests_get_allocated();
76 
77 static const long UNKNOWN_SIZE = 1234567890; // some unrealistic number
78 
79 static std::string const CacheName("core_test.cache");
80 static gcache::GCache* Cache   = NULL;
81 static gcs_core_t*     Core    = NULL;
82 static gcs_backend_t*  Backend = NULL;
83 static gcs_seqno_t     Seqno   = 0;
84 static gu::UUID        Uuid;
85 
86 typedef struct action {
87     const struct gu_buf* in;
88     void*                out;
89     const void*          local;
90     ssize_t              size;
91     gcs_act_type_t       type;
92     gcs_seqno_t          seqno;
93     gu_thread_t          thread;
94 
actionaction95     action() { }
actionaction96     action(const struct gu_buf* a_in,
97            void*                a_out,
98            const void*          a_local,
99            ssize_t              a_size,
100            gcs_act_type_t       a_type,
101            gcs_seqno_t          a_seqno,
102            gu_thread_t          a_thread)
103         :
104         in     (a_in),
105         out    (a_out),
106         local  (a_local),
107         size   (a_size),
108         type   (a_type),
109         seqno  (a_seqno),
110         thread (a_thread)
111     { }
112 } action_t;
113 
114 //static struct action_t RecvAct;
115 static const ssize_t FRAG_SIZE = 4; // desirable action fragment size
116 
117 // 1-fragment action
118 static const char act1_str[] = "101";
119 static const struct gu_buf act1[1] = {
120     { act1_str, sizeof(act1_str) }
121 };
122 
123 // 2-fragment action, with buffers aligned with FRAG_SIZE
124 static const char act2_str[] = "202122";
125 static const struct gu_buf act2[2] = {
126     { "2021", 4 },
127     { "22",   3 } /* 4 + 3 = 7 = sizeof(act2_str) */
128 };
129 
130 // 3-fragment action, with unaligned buffers
131 static const char act3_str[] = "3031323334";
132 static const struct gu_buf act3[] = {
133     { "303", 3 },
134     { "13",  2 },
135     { "23",  2 },
136     { "334", 4 } /* 3 + 2 + 2 + 4 = 11 = sizeof(act3_str) */
137 };
138 
139 
140 // action receive thread, returns after first action received, stores action
141 // in the passed action_t object, uses global Core to receive
142 static void*
core_recv_thread(void * arg)143 core_recv_thread (void* arg)
144 {
145     action_t* act = (action_t*)arg;
146 
147     // @todo: refactor according to new gcs_act types
148     struct gcs_act_rcvd recv_act;
149 
150     act->size  = gcs_core_recv (Core, &recv_act, GU_TIME_ETERNITY);
151     act->out   = (void*)recv_act.act.buf;
152     act->local = recv_act.local;
153     act->type  = recv_act.act.type;
154     act->seqno = recv_act.id;
155 
156     return (NULL);
157 }
158 
159 // this macro logs errors from within a function
160 #define FAIL_IF(expr, format, ...)                                      \
161     if (expr) {                                                         \
162         gu_fatal ("FAIL: " format, __VA_ARGS__);                  \
163         ck_assert_msg(false, format, __VA_ARGS__);                \
164         return true;                                                    \
165     }
166 
167 // Start a thread to receive an action
168 // args: action_t object
CORE_RECV_START(action_t * act)169 static inline bool CORE_RECV_START(action_t* act)
170 {
171     return (0 != gu_thread_create (&act->thread, NULL,
172                                    core_recv_thread, act));
173 }
174 
COMMON_RECV_CHECKS(action_t * act,const char * buf,int size,gcs_act_type_t type,gcs_seqno_t * seqno)175 static bool COMMON_RECV_CHECKS(action_t*      act,
176                                const char*    buf,
177                                int            size,
178                                gcs_act_type_t type,
179                                gcs_seqno_t*   seqno)
180 {
181     FAIL_IF (size != UNKNOWN_SIZE && size != act->size,
182              "gcs_core_recv(): expected size %d, returned %zd (%s)",
183              size, act->size, strerror (-act->size));
184     FAIL_IF (act->type != type,
185              "type does not match: expected %d, got %d", type, act->type);
186     FAIL_IF (act->size > 0 && act->out == NULL,
187              "null buffer received with positive size: %zu", act->size);
188 
189     if (act->type == GCS_ACT_STATE_REQ) return false;
190 
191     // action is ordered only if it is of type GCS_ACT_WRITESET or
192     // GCS_ACT_CCHANGE and not an error
193     if (act->seqno > GCS_SEQNO_ILL) {
194         FAIL_IF (GCS_ACT_WRITESET != act->type && GCS_ACT_CCHANGE != act->type
195                  && act->seqno > 0,
196                  "GCS_ACT_WRITESET != act->type (%d), while act->seqno: %lld",
197                  act->type, (long long)act->seqno);
198 
199         if (GCS_ACT_WRITESET == act->type)
200         {
201             assert((*seqno + 1) == act->seqno);
202             FAIL_IF ((*seqno + 1) != act->seqno,
203                      "expected seqno %lld, got %lld",
204                      (long long)(*seqno + 1), (long long)act->seqno);
205             *seqno = *seqno + 1;
206         }
207         else if (GCS_ACT_CCHANGE == act->type)
208         {
209             FAIL_IF (act->seqno < 0, "Negative seqno: %lld",
210                      (long long)act->seqno);
211 
212             Uuid = gcs_core_get_group(Core)->group_uuid;
213 
214             if (gcs_core_proto_ver(Core) >= 1) *seqno = *seqno + 1;
215         }
216     }
217 
218     if (NULL != buf) {
219         if (GCS_ACT_WRITESET == act->type) {
220             // local action buffer should not be copied
221             ck_assert_msg(act->local == act->in,
222                           "Received buffer ptr is not the same as sent: "
223                           "%p != %p", act->in, act->local);
224             ck_assert_msg(!memcmp(buf, act->out, act->size),
225                           "Received buffer contents is not the same as sent: "
226                           "'%s' != '%s'", buf, (char*)act->out);
227         }
228         else {
229             ck_assert_msg(act->local != buf,
230                           "Received the same buffer ptr as sent");
231             ck_assert_msg(!memcmp(buf, act->out, act->size),
232                           "Received buffer contents is not the same as sent");
233         }
234     }
235 
236     return false;
237 }
238 
239 // Wait for recv thread to complete, perform required checks
240 // args: action_t, expected size, expected type
CORE_RECV_END(action_t * act,const void * buf,ssize_t size,gcs_act_type_t type)241 static bool CORE_RECV_END(action_t*      act,
242                           const void*    buf,
243                           ssize_t        size,
244                           gcs_act_type_t type)
245 {
246     {
247         int ret = gu_thread_join (act->thread, NULL);
248         act->thread = (gu_thread_t)-1;
249         FAIL_IF(0 != ret, "Failed to join recv thread: %d (%s)",
250                 ret, strerror (ret));
251     }
252 
253     return COMMON_RECV_CHECKS (act, (const char*)buf, size, type, &Seqno);
254 }
255 
256 // Receive action in one call, perform required checks
257 // args: pointer to action_t, expected size, expected type
CORE_RECV_ACT(action_t * act,const void * buf,ssize_t size,gcs_act_type_t type)258 static bool CORE_RECV_ACT (action_t*      act,
259                            const void*    buf,  // single buffer action repres.
260                            ssize_t        size,
261                            gcs_act_type_t type)
262 {
263     struct gcs_act_rcvd recv_act;
264 
265     act->size  = gcs_core_recv (Core, &recv_act, GU_TIME_ETERNITY);
266     act->out   = (void*)recv_act.act.buf;
267     act->local = recv_act.local;
268     act->type  = recv_act.act.type;
269     act->seqno = recv_act.id;
270 
271     return COMMON_RECV_CHECKS (act, (const char*)buf, size, type, &Seqno);
272 }
273 
274 // Sending always needs to be done via separate thread (uses lock-stepping)
275 void*
core_send_thread(void * arg)276 core_send_thread (void* arg)
277 {
278     action_t* act = (action_t*)arg;
279 
280     // use seqno field to pass the return code, it is signed 8-byte integer
281     act->seqno = gcs_core_send (Core, act->in, act->size, act->type);
282 
283     return (NULL);
284 }
285 
286 // Start a thread to send an action
287 // args: action_t object
CORE_SEND_START(action_t * act)288 static bool CORE_SEND_START(action_t* act)
289 {
290     return (0 != gu_thread_create (&act->thread, NULL,
291                                    core_send_thread, act));
292 }
293 
294 // Wait for send thread to complete, perform required checks
295 // args: action_t, expected return code
CORE_SEND_END(action_t * act,long ret)296 static bool CORE_SEND_END(action_t* act, long ret)
297 {
298     {
299         long _ret = gu_thread_join (act->thread, NULL);
300         act->thread = (gu_thread_t)-1;
301         ck_assert_msg(0 == _ret, "Failed to join recv thread: %ld (%s)",
302                       _ret, strerror (_ret));
303     }
304 
305     ck_assert_msg(ret == act->seqno,
306                   "gcs_core_send(): expected %lld, returned %lld (%s)",
307                   (long long) ret, (long long) act->seqno, strerror (-act->seqno));
308 
309     return false;
310 }
311 
312 // check if configuration is the one that we expected
313 static long
core_test_check_conf(const void * const conf_msg,int const conf_size,bool const prim,long const my_idx,size_t const memb_num)314 core_test_check_conf (const void* const conf_msg, int const conf_size,
315                       bool const prim, long const my_idx, size_t const memb_num)
316 {
317     long ret = 0;
318 
319     gcs_act_cchange const conf(conf_msg, conf_size);
320 
321     if ((conf.conf_id >= 0) != prim) {
322         gu_error ("Expected %s conf, received %s",
323                   prim ? "PRIMARY" : "NON-PRIMARY",
324                   (conf.conf_id >= 0) ? "PRIMARY" : "NON-PRIMARY");
325         ret = -1;
326     }
327 
328     if (conf.memb.size() != memb_num) {
329         gu_error ("Expected memb_num = %zd, got %zd", memb_num,conf.memb.size());
330         ret = -1;
331     }
332 
333     return ret;
334 }
335 
336 static long
core_test_set_payload_size(ssize_t s)337 core_test_set_payload_size (ssize_t s)
338 {
339     long          ret;
340     const ssize_t arbitrary_pkt_size = s + 64; // big enough for payload to fit
341 
342     ret = gcs_core_set_pkt_size (Core, arbitrary_pkt_size);
343     if (ret <= 0) {
344         gu_error("set_pkt_size(%zd) returned: %ld (%s)", arbitrary_pkt_size,
345                  ret, strerror (-ret));
346         return ret;
347     }
348 
349     ret = gcs_core_set_pkt_size (Core, arbitrary_pkt_size - ret + s);
350     if (ret != s) {
351         gu_error("set_pkt_size() returned: %ld instead of %zd", ret, s);
352         return ret;
353     }
354 
355     return 0;
356 }
357 
358 // Initialises core and backend objects + some common tests
359 static inline void
core_test_init(gu::Config * config,bool bootstrap=true,int const gcs_proto_ver=1)360 core_test_init (gu::Config* config,
361                 bool bootstrap = true, int const gcs_proto_ver = 1)
362 {
363     long     ret;
364     action_t act;
365 
366     mark_point();
367 
368     ck_assert(config != NULL);
369 
370     gcs_test::InitConfig(*config, CacheName);
371 
372     Cache = new gcache::GCache(*config, ".");
373 
374     Core = gcs_core_create (reinterpret_cast<gu_config_t*>(config),
375                             reinterpret_cast<gcache_t*>(Cache),
376                             "core_test", "aaa.bbb.ccc.ddd:xxxx", 0, 0,
377                             gcs_proto_ver);
378 
379     ck_assert(NULL != Core);
380 
381     Backend = gcs_core_get_backend (Core);
382     ck_assert(NULL != Backend);
383 
384     Seqno = 0; // reset seqno
385 
386     ret = core_test_set_payload_size (FRAG_SIZE);
387     ck_assert_msg(-EBADFD == ret, "Expected -EBADFD, got: %ld (%s)",
388                   ret, strerror(-ret));
389 
390     ret = gcs_core_open (Core, "yadda-yadda", "owkmevc", 1);
391     ck_assert_msg(-EINVAL == ret, "Expected -EINVAL, got %ld (%s)",
392                   ret, strerror(-ret));
393 
394     ret = gcs_core_open (Core, "yadda-yadda", "dummy://", bootstrap);
395     ck_assert_msg(0 == ret, "Failed to open core connection: %ld (%s)",
396                   ret, strerror(-ret));
397 
398     if (!bootstrap) {
399         gcs_core_send_lock_step (Core, true);
400         mark_point();
401         return;
402     }
403 
404     // receive first configuration message
405     ck_assert(!CORE_RECV_ACT (&act, NULL, UNKNOWN_SIZE, GCS_ACT_CCHANGE));
406     ck_assert(!core_test_check_conf(act.out, act.size, bootstrap, 0, 1));
407     Cache->free(act.out);
408 
409     int const ver(gcs_core_proto_ver(Core));
410     ck_assert_msg(ver == gcs_proto_ver,"Expected protocol version: %d, got: %d",
411                   gcs_proto_ver, ver);
412 
413     // this will configure backend to have desired fragment size
414     ret = core_test_set_payload_size (FRAG_SIZE);
415     ck_assert_msg(0 == ret, "Failed to set up the message payload size: %ld (%s)",
416                   ret, strerror(-ret));
417 
418     // try to send an action to check that everything's alright
419     ret = gcs_core_send (Core, act1, sizeof(act1_str), GCS_ACT_WRITESET);
420     ck_assert_msg(ret == sizeof(act1_str), "Expected %zu, got %ld (%s)",
421                   sizeof(act1_str), ret, strerror (-ret));
422     gu_warn ("Next CORE_RECV_ACT fails under valgrind");
423     act.in = act1;
424     ck_assert(!CORE_RECV_ACT(&act, act1_str, sizeof(act1_str), GCS_ACT_WRITESET));
425 
426     ret = gcs_core_send_join (Core, gu::GTID(Uuid, Seqno), 0);
427     ck_assert_msg(ret >= 0, "gcs_core_send_join(): %ld (%s)",
428                   ret, strerror(-ret));
429     // no action to be received (we're joined already)
430 
431     ret = gcs_core_send_sync (Core, gu::GTID(Uuid, Seqno));
432 
433     int const proto(gcs_core_proto_ver(Core));
434     ck_assert(proto == gcs_proto_ver); // checking just in case
435 
436     int const expected_ret
437         (proto >= 1 ? gcs::core::CodeMsg::serial_size() : sizeof(gcs_seqno_t));
438     ck_assert_msg(ret == expected_ret,
439              "gcs_core_send_sync(): %ld (%s)", ret, strerror(-ret));
440 
441     ck_assert(!CORE_RECV_ACT(&act, NULL, sizeof(gcs_seqno_t), GCS_ACT_SYNC));
442 
443     gcs_seqno_t const s(gcs_seqno_gtoh(*(gcs_seqno_t*)act.out));
444 
445     int const expected_s(proto >= 1 ? 0 : Seqno);
446     ck_assert_msg(s == expected_s, "Expected code %lld, got %lld",
447                   (long long)expected_s, (long long)s);
448 
449     gcs_core_send_lock_step (Core, true);
450     mark_point();
451 }
452 
453 // cleans up core and backend objects
454 static inline void
core_test_cleanup()455 core_test_cleanup ()
456 {
457     long      ret;
458     char      tmp[1];
459     action_t  act;
460 
461     ck_assert(NULL != Core);
462     ck_assert(NULL != Backend);
463 
464     // to fetch self-leave message
465     ck_assert(!CORE_RECV_START (&act));
466     ret = gcs_core_close (Core);
467     ck_assert_msg(0 == ret, "Failed to close core: %ld (%s)",
468                   ret, strerror (-ret));
469     ret = CORE_RECV_END (&act, NULL, UNKNOWN_SIZE, GCS_ACT_CCHANGE);
470     ck_assert_msg(0 == ret, "ret: %ld (%s)", ret, strerror(-ret));
471     Cache->free(act.out);
472 
473     // check that backend is closed too
474     ret = Backend->send (Backend, tmp, sizeof(tmp), GCS_MSG_ACTION);
475     ck_assert(ret == -EBADFD);
476 
477     ret = gcs_core_destroy (Core);
478     ck_assert_msg(0 == ret, "Failed to destroy core: %ld (%s)",
479                   ret, strerror (-ret));
480 
481     {
482         ssize_t   allocated;
483         allocated = gcs_tests_get_allocated();
484         ck_assert_msg(0 == allocated,
485                       "Expected 0 allocated bytes, found %zd", allocated);
486     }
487 
488     delete Cache;
489     ::unlink(CacheName.c_str());
490 }
491 
492 // just a smoke test for core API
START_TEST(gcs_core_test_api)493 START_TEST (gcs_core_test_api)
494 {
495     gu::Config config;
496     core_test_init (&config);
497 
498     ck_assert(NULL != Cache);
499     ck_assert(NULL != Core);
500     ck_assert(NULL != Backend);
501 
502     long     ret;
503     long     tout = 100; // 100 ms timeout
504     const struct gu_buf* act = act3;
505     const void* act_buf  = act3_str;
506     size_t      act_size = sizeof(act3_str);
507 
508     action_t act_s(act, NULL, NULL, act_size, GCS_ACT_WRITESET, -1, (gu_thread_t)-1);
509     action_t act_r(act, NULL, NULL, -1, (gcs_act_type_t)-1, -1, (gu_thread_t)-1);
510     long i = 5;
511 
512     // test basic fragmentaiton
513     while (i--) {
514         long     frags    = (act_size - 1)/FRAG_SIZE + 1;
515 
516         gu_info ("Iteration %ld: act: %s, size: %zu, frags: %ld",
517                  i, act, act_size, frags);
518 
519         ck_assert(!CORE_SEND_START (&act_s));
520 
521         while ((ret = gcs_core_send_step (Core, 3*tout)) > 0) {
522             frags--; gu_info ("frags: %ld", frags);
523 //            usleep (1000);
524         }
525 
526         ck_assert_msg(ret == 0, "gcs_core_send_step() returned: %ld (%s)",
527                       ret, strerror(-ret));
528         ck_assert_msg(frags == 0, "frags = %ld, instead of 0", frags);
529         ck_assert(!CORE_SEND_END (&act_s, act_size));
530         ck_assert(!CORE_RECV_ACT (&act_r, act_buf, act_size, GCS_ACT_WRITESET));
531 
532         ret = gcs_core_set_last_applied (Core, gu::GTID(Uuid, Seqno));
533         ck_assert_msg(ret >= 0, "gcs_core_set_last_applied(): %ld (%s)",
534                       ret, strerror(-ret));
535         /* commit cut action size should be 8 */
536         ck_assert(!CORE_RECV_ACT (&act_r, NULL, 8, GCS_ACT_COMMIT_CUT));
537         ck_assert(Seqno == gcs_seqno_gtoh(*(gcs_seqno_t*)act_r.out));
538         free(act_r.out); // commit cut is allocated by malloc()
539     }
540 
541     // send fake flow control action, its contents is not important
542     gcs_core_send_fc (Core, act, act_size);
543     ck_assert_msg(ret >= 0, "gcs_core_send_fc(): %ld (%s)",
544                   ret, strerror(-ret));
545     ck_assert(!CORE_RECV_ACT(&act_r, act, act_size, GCS_ACT_FLOW));
546 
547     core_test_cleanup ();
548 }
549 END_TEST
550 
551 // do a single send step, compare with the expected result
552 static inline bool
CORE_SEND_STEP(gcs_core_t * core,long timeout,long ret)553 CORE_SEND_STEP (gcs_core_t* core, long timeout, long ret)
554 {
555    long err = gcs_core_send_step (core, timeout);
556    ck_assert_msg(err >= 0, "gcs_core_send_step(): %ld (%s)",
557                  err, strerror (-err));
558    if (ret >= 0) {
559        ck_assert_msg(err == ret, "gcs_core_send_step(): expected %ld, got %ld",
560                      ret, err);
561    }
562 
563    return false;
564 }
565 
566 static bool
DUMMY_INJECT_COMPONENT(gcs_backend_t * backend,const gcs_comp_msg_t * comp)567 DUMMY_INJECT_COMPONENT (gcs_backend_t* backend, const gcs_comp_msg_t* comp)
568 {
569     long ret = gcs_dummy_inject_msg (Backend, comp,
570                                      gcs_comp_msg_size(comp),
571                                      GCS_MSG_COMPONENT, GCS_SENDER_NONE);
572     ck_assert_msg(ret > 0, "gcs_dummy_inject_msg(): %ld (%s)",
573                   ret, strerror(ret));
574 
575     return false;
576 }
577 
578 static bool
DUMMY_INSTALL_COMPONENT(gcs_backend_t * backend,const gcs_comp_msg_t * comp)579 DUMMY_INSTALL_COMPONENT (gcs_backend_t* backend, const gcs_comp_msg_t* comp)
580 {
581     bool primary = gcs_comp_msg_primary (comp);
582     long my_idx  = gcs_comp_msg_self    (comp);
583     long members = gcs_comp_msg_num     (comp);
584 
585     action_t act;
586 
587     FAIL_IF (gcs_dummy_set_component(Backend, comp), "%s",
588              "gcs_dummy_set_component");
589     FAIL_IF (DUMMY_INJECT_COMPONENT (Backend, comp), "%s",
590              "DUMMT_INJECT_COMPONENT");
591     FAIL_IF (CORE_RECV_ACT (&act, NULL, UNKNOWN_SIZE, GCS_ACT_CCHANGE),
592              "%s", "CORE_RECV_ACT");
593     FAIL_IF (core_test_check_conf(act.out, act.size, primary, my_idx, members),
594              "%s", "core_test_check_conf");
595     Cache->free(act.out);
596     return false;
597 }
598 
599 static void
CORE_TEST_OWN(int gcs_proto_ver)600 CORE_TEST_OWN (int gcs_proto_ver)
601 {
602     long const tout = 1000; // 100 ms timeout
603 
604     const struct gu_buf* act      = act2;
605     const void*          act_buf  = act2_str;
606     size_t               act_size = sizeof(act2_str);
607 
608     action_t act_s(act, NULL, NULL, act_size, GCS_ACT_WRITESET, -1, (gu_thread_t)-1);
609     action_t act_r(act, NULL, NULL, -1, (gcs_act_type_t)-1, -1, (gu_thread_t)-1);
610 
611     // Create primary and non-primary component messages
612     gcs_comp_msg_t* prim     = gcs_comp_msg_new (true, false,  0, 1, 0);
613     gcs_comp_msg_t* non_prim = gcs_comp_msg_new (false, false, 0, 1, 0);
614     ck_assert(NULL != prim);
615     ck_assert(NULL != non_prim);
616     gcs_comp_msg_add (prim,     "node1", 0);
617     gcs_comp_msg_add (non_prim, "node1", 1);
618 
619     gu::Config config;
620     core_test_init (&config, true, gcs_proto_ver);
621 
622     /////////////////////////////////////////////
623     /// check behaviour in transitional state ///
624     /////////////////////////////////////////////
625 
626     ck_assert(!CORE_RECV_START (&act_r));
627     ck_assert(!CORE_SEND_START (&act_s));
628     ck_assert(!CORE_SEND_STEP (Core, tout, 1)); // 1st frag
629     usleep (10000); // resolve race between sending and setting transitional
630     gcs_dummy_set_transitional (Backend);
631     ck_assert(!CORE_SEND_STEP (Core, tout, 1)); // 2nd frag
632     ck_assert(!CORE_SEND_STEP (Core, tout, 0)); // no frags left
633     ck_assert(NULL == act_r.out); // should not have received anything
634     ck_assert(!gcs_dummy_set_component (Backend, prim)); // return to PRIM state
635     ck_assert(!CORE_SEND_END (&act_s, act_size));
636     ck_assert(!CORE_RECV_END (&act_r, act_buf, act_size, GCS_ACT_WRITESET));
637 
638     /*
639      * TEST CASE 1: Action was sent successfully, but NON_PRIM component
640      * happened before any fragment could be delivered.
641      * EXPECTED OUTCOME: action is received with -ENOTCONN instead of global
642      * seqno
643      */
644     ck_assert(!DUMMY_INJECT_COMPONENT (Backend, non_prim));
645     ck_assert(!CORE_SEND_START (&act_s));
646     ck_assert(!CORE_SEND_STEP (Core, tout, 1)); // 1st frag
647     ck_assert(!CORE_SEND_STEP (Core, tout, 1)); // 2nd frag
648     ck_assert(!CORE_SEND_END (&act_s, act_size));
649     ck_assert(!gcs_dummy_set_component(Backend, non_prim));
650     ck_assert(!CORE_RECV_ACT (&act_r, NULL, UNKNOWN_SIZE, GCS_ACT_CCHANGE));
651     ck_assert(!core_test_check_conf(act_r.out, act_r.size, false, 0, 1));
652     Cache->free(act_r.out);
653     ck_assert(!CORE_RECV_ACT (&act_r, act_buf, act_size, GCS_ACT_WRITESET));
654     ck_assert_msg(-ENOTCONN == act_r.seqno,
655                   "Expected -ENOTCONN, received %ld (%s)",
656                   act_r.seqno, strerror (-act_r.seqno));
657 
658     /*
659      * TEST CASE 2: core in NON_PRIM state. There is attempt to send an
660      * action.
661      * EXPECTED OUTCOME: CORE_SEND_END should return -ENOTCONN after 1st
662      * fragment send fails.
663      */
664     ck_assert(!CORE_SEND_START (&act_s));
665     ck_assert(!CORE_SEND_STEP (Core, tout, 1)); // 1st frag
666     ck_assert(!CORE_SEND_STEP (Core, tout, 0)); // bail out after 1st frag
667     ck_assert(!CORE_SEND_END (&act_s, -ENOTCONN));
668 
669     /*
670      * TEST CASE 3: Backend in NON_PRIM state. There is attempt to send an
671      * action.
672      * EXPECTED OUTCOME: CORE_SEND_END should return -ENOTCONN after 1st
673      * fragment send fails.
674      */
675     ck_assert(!DUMMY_INSTALL_COMPONENT (Backend, prim));
676     ck_assert(!gcs_dummy_set_component(Backend, non_prim));
677     ck_assert(!DUMMY_INJECT_COMPONENT (Backend, non_prim));
678     ck_assert(!CORE_SEND_START (&act_s));
679     ck_assert(!CORE_SEND_STEP (Core, tout, 1)); // 1st frag
680     ck_assert(!CORE_SEND_END (&act_s, -ENOTCONN));
681     ck_assert(!CORE_RECV_ACT (&act_r, NULL, UNKNOWN_SIZE, GCS_ACT_CCHANGE));
682     ck_assert(!core_test_check_conf(act_r.out, act_r.size, false, 0, 1));
683     Cache->free(act_r.out);
684 
685     /*
686      * TEST CASE 4: Action was sent successfully, but NON_PRIM component
687      * happened in between delivered fragments.
688      * EXPECTED OUTCOME: action is received with -ENOTCONN instead of global
689      * seqno.
690      */
691     ck_assert(!DUMMY_INSTALL_COMPONENT (Backend, prim));
692     ck_assert(!CORE_SEND_START (&act_s));
693     ck_assert(!CORE_SEND_STEP (Core, tout, 1)); // 1st frag
694     ck_assert(!DUMMY_INJECT_COMPONENT (Backend, non_prim));
695     ck_assert(!CORE_SEND_STEP (Core, tout, 1)); // 2nd frag
696     ck_assert(!CORE_SEND_END (&act_s, act_size));
697     ck_assert(!CORE_RECV_ACT (&act_r, NULL, UNKNOWN_SIZE, GCS_ACT_CCHANGE));
698     ck_assert(!core_test_check_conf(act_r.out, act_r.size, false, 0, 1));
699     Cache->free(act_r.out);
700     ck_assert(!CORE_RECV_ACT (&act_r, act_buf, act_size, GCS_ACT_WRITESET));
701     ck_assert_msg(-ENOTCONN == act_r.seqno,
702                   "Expected -ENOTCONN, received %ld (%s)",
703                   act_r.seqno, strerror (-act_r.seqno));
704 
705     /*
706      * TEST CASE 5: Action is being sent and received concurrently. In between
707      * two fragments recv thread receives NON_PRIM and then PRIM components.
708      * EXPECTED OUTCOME: CORE_RECV_ACT should receive the action with -ERESTART
709      * instead of seqno.
710      */
711     ck_assert(!DUMMY_INSTALL_COMPONENT (Backend, prim));
712     ck_assert(!CORE_SEND_START (&act_s));
713     ck_assert(!CORE_SEND_STEP (Core, tout, 1)); // 1st frag
714     usleep (100000); // make sure 1st fragment gets in before new component
715     ck_assert(!DUMMY_INSTALL_COMPONENT (Backend, non_prim));
716     ck_assert(!DUMMY_INSTALL_COMPONENT (Backend, prim));
717     ck_assert(!CORE_SEND_STEP (Core, tout, 1)); // 2nd frag
718     ck_assert(!CORE_SEND_END (&act_s, act_size));
719     ck_assert(!CORE_RECV_ACT (&act_r, act_buf, act_size, GCS_ACT_WRITESET));
720     ck_assert_msg(-ERESTART == act_r.seqno,
721                   "Expected -ERESTART, received %ld (%s)",
722                   act_r.seqno, strerror (-act_r.seqno));
723 
724     /*
725      * TEST CASE 6: Action has 3 fragments, 2 were sent successfully but the
726      * 3rd failed because backend is in NON_PRIM. In addition NON_PRIM component
727      * happened in between delivered fragments.
728      * subcase 1: new component received first
729      * subcase 2: 3rd fragment is sent first
730      * EXPECTED OUTCOME: CORE_SEND_END should return -ENOTCONN after 3rd
731      * fragment send fails.
732      */
733     act        = act3;
734     act_buf    = act3_str;
735     act_size   = sizeof(act3_str);
736     act_s.in   = act;
737     act_s.size = act_size;
738 
739     // subcase 1
740     ck_assert(!DUMMY_INSTALL_COMPONENT (Backend, prim));
741     ck_assert(!CORE_SEND_START (&act_s));
742     ck_assert(!CORE_SEND_STEP (Core, tout, 1)); // 1st frag
743     ck_assert(!DUMMY_INJECT_COMPONENT (Backend, non_prim));
744     ck_assert(!CORE_SEND_STEP (Core, tout, 1)); // 2nd frag
745     usleep (500000); // fail_if_seq
746     ck_assert(!gcs_dummy_set_component(Backend, non_prim));
747     ck_assert(!CORE_RECV_ACT (&act_r, NULL, UNKNOWN_SIZE, GCS_ACT_CCHANGE));
748     ck_assert(!core_test_check_conf(act_r.out, act_r.size, false, 0, 1));
749     Cache->free(act_r.out);
750     ck_assert(!CORE_SEND_STEP (Core, tout, 1)); // 3rd frag
751     ck_assert(!CORE_SEND_END (&act_s, -ENOTCONN));
752 
753     // subcase 2
754     ck_assert(!DUMMY_INSTALL_COMPONENT (Backend, prim));
755     ck_assert(!CORE_SEND_START (&act_s));
756     ck_assert(!CORE_SEND_STEP (Core, tout, 1)); // 1st frag
757     ck_assert(!DUMMY_INJECT_COMPONENT (Backend, non_prim));
758     ck_assert(!CORE_SEND_STEP (Core, tout, 1)); // 2nd frag
759     usleep (1000000);
760     ck_assert(!gcs_dummy_set_component(Backend, non_prim));
761     ck_assert(!CORE_SEND_STEP (Core, 4*tout, 1)); // 3rd frag
762     ck_assert(!CORE_RECV_ACT (&act_r, NULL, UNKNOWN_SIZE, GCS_ACT_CCHANGE));
763     ck_assert(!core_test_check_conf(act_r.out, act_r.size, false, 0, 1));
764     Cache->free(act_r.out);
765     ck_assert(!CORE_SEND_END (&act_s, -ENOTCONN));
766 
767     gu_free (prim);
768     gu_free (non_prim);
769 
770     core_test_cleanup ();
771 }
772 
START_TEST(gcs_core_test_own_v0)773 START_TEST (gcs_core_test_own_v0)
774 {
775     CORE_TEST_OWN(0);
776 }
777 END_TEST
778 
START_TEST(gcs_core_test_own_v1)779 START_TEST (gcs_core_test_own_v1)
780 {
781     CORE_TEST_OWN(1);
782 }
783 END_TEST
784 
785 #ifdef GCS_ALLOW_GH74
786 /*
787  * Disabled test because it is too slow and timeouts on crowded
788  * build systems like e.g. build.opensuse.org */
789 
START_TEST(gcs_core_test_gh74)790 START_TEST (gcs_core_test_gh74)
791 {
792     gu::Config config;
793     core_test_init(&config, true, "node1");
794 
795     // set frag size large enough to avoid fragmentation.
796     gu_info ("set payload size = 1024");
797     core_test_set_payload_size(1024);
798 
799     // new primary comp message.
800     gcs_comp_msg_t* prim = gcs_comp_msg_new (true, false, 0, 2, 0);
801     ck_assert(NULL != prim);
802     gcs_comp_msg_add(prim, "node1", 0);
803     gcs_comp_msg_add(prim, "node2", 1);
804 
805     // construct state transform request.
806     static const char* req_ptr = "12345";
807     static const size_t req_size = 6;
808     static const char* donor = ""; // from *any*
809     static const size_t donor_len = strlen(donor) + 1;
810     size_t act_size = req_size + donor_len;
811     char* act_ptr = 0;
812 
813     act_ptr = (char*)gu_malloc(act_size);
814     memcpy(act_ptr, donor, donor_len);
815     memcpy(act_ptr + donor_len, req_ptr, req_size);
816 
817     // serialize request into message.
818     gcs_act_frag_t frg;
819     frg.proto_ver = gcs_core_group_protocol_version(Core);
820     frg.frag_no = 0;
821     frg.act_id = 1;
822     frg.act_size = act_size;
823     frg.act_type = GCS_ACT_STATE_REQ;
824     char msg_buf[1024];
825     ck_assert(!gcs_act_proto_write(&frg, msg_buf, sizeof(msg_buf)));
826     memcpy(const_cast<void*>(frg.frag), act_ptr, act_size);
827     size_t msg_size = act_size + gcs_act_proto_hdr_size(frg.proto_ver);
828     // gu_free(act_ptr);
829 
830     // state exchange message.
831     gu_uuid_t state_uuid;
832     gu_uuid_generate(&state_uuid, NULL, 0);
833     gcs_core_set_state_uuid(Core, &state_uuid);
834 
835     // construct uuid message from node1.
836     size_t uuid_len = sizeof(state_uuid);
837     char uuid_buf[uuid_len];
838     memcpy(uuid_buf, &state_uuid, uuid_len);
839 
840     gcs_state_msg_t* state_msg = NULL;
841     const gcs_group_t* group = gcs_core_get_group(Core);
842 
843     // state exchange message from node1
844     state_msg = gcs_group_get_state(group);
845     state_msg->state_uuid = state_uuid;
846     size_t state_len = gcs_state_msg_len (state_msg);
847     char state_buf[state_len];
848     gcs_state_msg_write (state_buf, state_msg);
849     gcs_state_msg_destroy (state_msg);
850 
851     // state exchange message from node2
852     state_msg = gcs_state_msg_create(&state_uuid,
853                                      &GU_UUID_NIL,
854                                      &GU_UUID_NIL,
855                                      GCS_SEQNO_ILL,
856                                      GCS_SEQNO_ILL,
857                                      GCS_SEQNO_ILL,
858                                      0,
859                                      GCS_NODE_STATE_NON_PRIM,
860                                      GCS_NODE_STATE_PRIM,
861                                      "node2", "127.0.0.1",
862                                      group->gcs_proto_ver,
863                                      group->repl_proto_ver,
864                                      group->appl_proto_ver,
865                                      group->prim_gcs_ver,
866                                      group->prim_repl_ver,
867                                      group->prim_appl_ver,
868                                      0, // desync count
869                                      0);
870     size_t state_len2 = gcs_state_msg_len (state_msg);
871     char state_buf2[state_len2];
872     gcs_state_msg_write (state_buf2, state_msg);
873     gcs_state_msg_destroy (state_msg);
874 
875     action_t act_r(NULL,  NULL, NULL, -1, (gcs_act_type_t)-1, -1, (gu_thread_t)-1);
876 
877     // ========== from node1's view ==========
878     ck_assert(!gcs_dummy_set_component(Backend, prim));
879     ck_assert(!DUMMY_INJECT_COMPONENT(Backend, prim));
880     gu_free(prim);
881     CORE_RECV_START(&act_r); // we have to start another thread here.
882     // otherwise messages to node1 can not be in right order.
883     for(;;) {
884         usleep(10000); // make sure node1 already changed its status to WAIT_STATE_MSG
885         if (gcs_group_state(group) == GCS_GROUP_WAIT_STATE_MSG) {
886             break;
887         }
888     }
889     // then STR sneaks before new configuration is delivered.
890     ck_assert(gcs_dummy_inject_msg(Backend, msg_buf, msg_size,
891                                    GCS_MSG_ACTION, 1) == (int)msg_size);
892     // then state exchange message from node2.
893     ck_assert(gcs_dummy_inject_msg(Backend, state_buf2, state_len2,
894                                    GCS_MSG_STATE_MSG, 1) == (int)state_len2);
895     // expect STR is lost here.
896     ck_assert(!CORE_RECV_END(&act_r, NULL, UNKNOWN_SIZE, GCS_ACT_CCHANGE));
897     ck_assert(!core_test_check_conf((const gcs_act_cchange_t*)act_r.out, true, 0, 2));
898     free(act_r.out);
899     core_test_cleanup();
900 
901     // ========== from node2's view ==========
902     core_test_init(&config, false, "node2");
903 
904     // set frag size large enough to avoid fragmentation.
905     gu_info ("set payload size = 1024");
906     core_test_set_payload_size(1024);
907 
908     prim = gcs_comp_msg_new (true, false, 1, 2, 0);
909     ck_assert(NULL != prim);
910     gcs_comp_msg_add(prim, "node1", 0);
911     gcs_comp_msg_add(prim, "node2", 1);
912 
913     // node1 and node2 joins.
914     // now node2's status == GCS_NODE_STATE_PRIM
915     ck_assert(!gcs_dummy_set_component(Backend, prim));
916     ck_assert(!DUMMY_INJECT_COMPONENT(Backend, prim));
917     gu_free(prim);
918     ck_assert(gcs_dummy_inject_msg(Backend, uuid_buf, uuid_len,
919               GCS_MSG_STATE_UUID, 0) == (int)uuid_len);
920     ck_assert(gcs_dummy_inject_msg(Backend, state_buf, state_len,
921               GCS_MSG_STATE_MSG, 0)  == (int)state_len);
922     ck_assert(!CORE_RECV_ACT(&act_r, NULL, UNKNOWN_SIZE, GCS_ACT_CCHANGE));
923     ck_assert(!core_test_check_conf((const gcs_act_cchange_t*)act_r.out, true, 1, 2));
924     free(act_r.out);
925 
926     // then node3 joins.
927     prim = gcs_comp_msg_new (true, false, 1, 3, 0);
928     ck_assert(NULL != prim);
929     gcs_comp_msg_add(prim, "node1", 0);
930     gcs_comp_msg_add(prim, "node2", 1);
931     gcs_comp_msg_add(prim, "node3", 2);
932     ck_assert(!gcs_dummy_set_component(Backend, prim));
933     ck_assert(!DUMMY_INJECT_COMPONENT(Backend, prim));
934     gu_free(prim);
935 
936     // generate a new state uuid.
937     gu_uuid_generate(&state_uuid, NULL, 0);
938     memcpy(uuid_buf, &state_uuid, uuid_len);
939 
940     // state exchange message from node3
941     group = gcs_core_get_group(Core);
942     state_msg = gcs_state_msg_create(&state_uuid,
943                                      &GU_UUID_NIL,
944                                      &GU_UUID_NIL,
945                                      GCS_SEQNO_ILL,
946                                      GCS_SEQNO_ILL,
947                                      GCS_SEQNO_ILL,
948                                      0,
949                                      GCS_NODE_STATE_NON_PRIM,
950                                      GCS_NODE_STATE_PRIM,
951                                      "node3", "127.0.0.1",
952                                      group->gcs_proto_ver,
953                                      group->repl_proto_ver,
954                                      group->appl_proto_ver,
955                                      group->prim_gcs_ver,
956                                      group->prim_repl_ver,
957                                      group->prim_appl_ver,
958                                      0, // desync count
959                                      0);
960     size_t state_len3 = gcs_state_msg_len (state_msg);
961     char state_buf3[state_len3];
962     gcs_state_msg_write (state_buf3, state_msg);
963     gcs_state_msg_destroy (state_msg);
964 
965     // updating state message from node1.
966     group = gcs_core_get_group(Core);
967     state_msg = gcs_group_get_state(group);
968     state_msg->flags = GCS_STATE_FREP | GCS_STATE_FCLA;
969     state_msg->prim_state = GCS_NODE_STATE_JOINED;
970     state_msg->current_state = GCS_NODE_STATE_SYNCED;
971     state_msg->state_uuid = state_uuid;
972     state_msg->name = "node1";
973     gcs_state_msg_write(state_buf, state_msg);
974     gcs_state_msg_destroy(state_msg);
975 
976     ck_assert(gcs_dummy_inject_msg(Backend, uuid_buf, uuid_len,
977                                    GCS_MSG_STATE_UUID, 0) == (int)uuid_len);
978     ck_assert(gcs_dummy_inject_msg(Backend, state_buf, state_len,
979                                    GCS_MSG_STATE_MSG, 0) == (int)state_len);
980 
981     // STR sneaks.
982     // we have to make same message exists in sender queue too.
983     // otherwise we will get following log
984     // "FIFO violation: queue empty when local action received"
985     const struct gu_buf act = {act_ptr, (ssize_t)act_size};
986     action_t act_s(&act, NULL, NULL, act_size, GCS_ACT_STATE_REQ, -1, (gu_thread_t)-1);
987     CORE_SEND_START(&act_s);
988     for(;;) {
989         usleep(10000);
990         gcs_fifo_lite_t* fifo = gcs_core_get_fifo(Core);
991         void* item = gcs_fifo_lite_get_head(fifo);
992         if (item) {
993             gcs_fifo_lite_release(fifo);
994             break;
995         }
996     }
997     ck_assert(gcs_dummy_inject_msg(Backend, msg_buf, msg_size,
998                                    GCS_MSG_ACTION, 1) == (int)msg_size);
999 
1000     ck_assert(gcs_dummy_inject_msg(Backend, state_buf3, state_len3,
1001                                    GCS_MSG_STATE_MSG, 2) == (int)state_len3);
1002 
1003     // expect STR and id == -EAGAIN.
1004     ck_assert(!CORE_RECV_ACT(&act_r, act_ptr, act_size, GCS_ACT_STATE_REQ));
1005     ck_assert(act_r.seqno == -EAGAIN);
1006     free(act_r.out);
1007 
1008     ck_assert(!CORE_RECV_ACT(&act_r, NULL, UNKNOWN_SIZE, GCS_ACT_CCHANGE));
1009     ck_assert(!core_test_check_conf((const gcs_act_cchange_t*)act_r.out, true, 1, 3));
1010     free(act_r.out);
1011 
1012     // core_test_cleanup();
1013     // ==========
1014     gu_free(act_ptr);
1015 }
1016 END_TEST
1017 #endif /* GCS_ALLOW_GH74 */
1018 
1019 
1020 #if 0 // requires multinode support from gcs_dummy
1021 START_TEST (gcs_core_test_foreign)
1022 {
1023     core_test_init ();
1024 
1025     core_test_cleanup ();
1026 }
1027 END_TEST
1028 #endif // 0
1029 
gcs_core_suite(void)1030 Suite *gcs_core_suite(void)
1031 {
1032   Suite *suite = suite_create("GCS core context");
1033   TCase *tcase = tcase_create("gcs_core");
1034 
1035   suite_add_tcase (suite, tcase);
1036   tcase_set_timeout(tcase, 60);
1037   bool skip = false;
1038 
1039   if (skip == false) {
1040       tcase_add_test  (tcase, gcs_code_msg);
1041       tcase_add_test  (tcase, gcs_core_test_api);
1042       tcase_add_test  (tcase, gcs_core_test_own_v0);
1043       tcase_add_test  (tcase, gcs_core_test_own_v1);
1044 #ifdef GCS_ALLOW_GH74
1045       tcase_add_test  (tcase, gcs_core_test_gh74);
1046 #endif /* GCS_ALLOW_GH74 */
1047       // tcase_add_test (tcase, gcs_core_test_foreign);
1048   }
1049   return suite;
1050 }
1051