1 /*
2  * Copyright (C) 2011-2020 Codership Oy <info@codership.com>
3  *
4  * $Id$
5  */
6 
7 #define GCACHE_RB_UNIT_TEST
8 
9 #include "gcache_rb_store.hpp"
10 #include "gcache_bh.hpp"
11 #include "gcache_rb_test.hpp"
12 
13 #include <gu_logger.hpp>
14 #include <gu_throw.hpp>
15 
16 using namespace gcache;
17 
18 static gu::UUID    const GID(NULL, 0);
19 static std::string const RB_NAME("rb_test");
20 static size_t      const BH_SIZE(sizeof(gcache::BufferHeader));
21 
22 typedef MemOps::size_type size_type;
23 
ALLOC_SIZE(size_type s)24 static size_type ALLOC_SIZE(size_type s)
25 {
26     return MemOps::align_size(s + BH_SIZE);
27 }
28 
START_TEST(test1)29 START_TEST(test1)
30 {
31     ::unlink(RB_NAME.c_str());
32 
33     size_t const rb_size(ALLOC_SIZE(2) * 2);
34 
35     seqno2ptr_t s2p(SEQNO_NONE);
36     gu::UUID   gid(GID);
37     RingBuffer rb(RB_NAME, rb_size, s2p, gid, 0, false);
38 
39     ck_assert_msg(rb.size() == rb_size,
40                   "Expected %zd, got %zd", rb_size, rb.size());
41 
42     if (gid != GID)
43     {
44         std::ostringstream os;
45         os << "Expected GID: " << GID << ", got: " << gid;
46         ck_abort_msg("%s", os.str().c_str());
47     }
48 
49     void* buf1 = rb.malloc (MemOps::align_size(rb_size/2 + 1));
50     ck_assert(NULL == buf1); // > 1/2 size
51 
52     buf1 = rb.malloc (ALLOC_SIZE(1));
53     ck_assert(NULL != buf1);
54 
55     BufferHeader* bh1(ptr2BH(buf1));
56     ck_assert(bh1->seqno_g == SEQNO_NONE);
57     ck_assert(!BH_is_released(bh1));
58 
59     void* buf2 = rb.malloc (ALLOC_SIZE(2));
60     ck_assert(NULL != buf2);
61     ck_assert(!BH_is_released(bh1));
62 
63     BufferHeader* bh2(ptr2BH(buf2));
64     ck_assert(bh2->seqno_g == SEQNO_NONE);
65     ck_assert(!BH_is_released(bh2));
66 
67     void* tmp = rb.realloc (buf1, ALLOC_SIZE(2));
68     // anything <= MemOps::ALIGNMENT should fit into original buffer
69     ck_assert(tmp == buf1 && MemOps::ALIGNMENT > 1);
70 
71     tmp = rb.realloc (buf1, ALLOC_SIZE(MemOps::ALIGNMENT + 1));
72     // should require new buffer for which there's no space
73     ck_assert(bh2->seqno_g == SEQNO_NONE);
74     ck_assert(NULL == tmp);
75 
76     BH_release(bh2);
77     rb.free (bh2);
78 
79     tmp = rb.realloc (buf1, ALLOC_SIZE(3));
80     if (MemOps::ALIGNMENT > 2)
81     {
82         ck_assert(NULL != tmp);
83         ck_assert(buf1 == tmp);
84     }
85     else
86     {
87         ck_assert(NULL == tmp);
88     }
89 
90     BH_release(bh1);
91     rb.free (bh1);
92     ck_assert(BH_is_released(bh1));
93 
94     buf1 = rb.malloc(ALLOC_SIZE(1));
95     ck_assert(NULL != buf1);
96 
97     tmp = rb.realloc (buf1, ALLOC_SIZE(2));
98     ck_assert(NULL != tmp);
99     ck_assert(tmp  == buf1);
100 
101     buf2 = rb.malloc (ALLOC_SIZE(1));
102     ck_assert(NULL != buf2);
103 
104     tmp = rb.realloc (buf2, ALLOC_SIZE(2));
105     ck_assert(NULL != tmp);
106     ck_assert(tmp  == buf2);
107 
108     tmp = rb.malloc (ALLOC_SIZE(1));
109     ck_assert(NULL == tmp);
110 
111     BH_release(ptr2BH(buf1));
112     rb.free(ptr2BH(buf1));
113 
114     BH_release(ptr2BH(buf2));
115     rb.free(ptr2BH(buf2));
116 
117     tmp = rb.malloc (ALLOC_SIZE(2));
118     ck_assert(NULL != tmp);
119 
120     mark_point();
121 }
122 END_TEST
123 
START_TEST(recovery)124 START_TEST(recovery)
125 {
126     struct msg
127     {
128         char    msg;
129         seqno_t g;
130         seqno_t d;
131 
132         size_t size() const { return sizeof(msg); }
133     };
134 
135 #define MAX_MSGS 10
136     struct msg msgs[MAX_MSGS] =
137     {
138         { '0',         1,         0 },
139         { '1',         2,         0 },
140         { '2',         4,         1 },
141         { '3', SEQNO_ILL, SEQNO_ILL },
142         { '4',         3,         1 },
143         { '5', SEQNO_ILL, SEQNO_ILL },
144         { '6',         5, SEQNO_ILL },
145         { '7', SEQNO_ILL, SEQNO_ILL },
146         { '8',         6,         4 },
147         { '9',         7,         4 }
148     };
149 
150     size_type const msg_size(ALLOC_SIZE(sizeof(reinterpret_cast<msg*>(0)->msg)));
151 
152     struct rb_ctx
153     {
154         size_t const size;
155         seqno2ptr_t  s2p;
156         gu::UUID     gid;
157         RingBuffer   rb;
158 
159         rb_ctx(size_t s, bool recover = true) :
160             size(s), s2p(SEQNO_NONE), gid(GID),
161             rb(RB_NAME, size, s2p, gid, 0, recover)
162         {}
163 
164         void seqno_assign (seqno2ptr_t& s2p, void* const ptr,
165                            seqno_t const g, seqno_t const d)
166         {
167             s2p.insert(g, ptr);
168 
169             BufferHeader* bh(ptr2BH(ptr));
170             bh->seqno_g = g;
171             if (d < 0) bh->flags |= BUFFER_SKIPPED;
172         }
173 
174         void* add_msg(struct msg& m)
175         {
176             void* ret(rb.malloc(ALLOC_SIZE(m.size())));
177 
178             if (ret)
179             {
180                 ::memcpy(ret, &m.msg, m.size());
181 
182                 if (m.g > 0) seqno_assign(s2p, ret, m.g, m.d);
183 
184                 BH_release(ptr2BH(ret));
185                 rb.free(ptr2BH(ret));
186             }
187 
188             return ret;
189         }
190 
191         void print_map()
192         {
193             std::ostringstream os;
194             os << "S2P map:\n";
195             for (seqno2ptr_t::iterator i = s2p.begin(); i != s2p.end(); ++i)
196             {
197                 log_info << "\tseqno: " << s2p.index(i) << ", msg: "
198                          << reinterpret_cast<const char*>(*i) << "\n";
199             }
200 
201             log_info << os.str();
202         }
203     };
204 
205     seqno_t seqno_min, seqno_max;
206     size_t const rb_5size(msg_size*5);
207 
208     {
209         rb_ctx ctx(rb_5size);
210 
211         ck_assert_msg(ctx.rb.size() == ctx.size,
212                       "Expected %zd, got %zd", ctx.size, ctx.rb.size());
213 
214         if (ctx.gid != GID)
215         {
216             std::ostringstream os;
217             os << "Expected GID: " << GID << ", got: " << ctx.gid;
218             ck_abort_msg("%s", os.str().c_str());
219         }
220 
221         ck_assert(ctx.s2p.empty());
222 
223         void* m(ctx.add_msg(msgs[0]));
224         ck_assert(NULL != m);
225         ck_assert(*ctx.s2p.find(msgs[0].g) == m);
226 
227         m = ctx.add_msg(msgs[1]);
228         ck_assert(NULL != m);
229         ck_assert(*ctx.s2p.find(msgs[1].g) == m);
230 
231         m = ctx.add_msg(msgs[2]);
232         ck_assert(NULL != m);
233         ck_assert(*ctx.s2p.find(msgs[2].g) == m);
234 
235         m = ctx.add_msg(msgs[3]);
236         ck_assert(NULL != m);
237         ck_assert(msgs[3].g <= 0);
238         ck_assert(ctx.s2p.find(msgs[3].g) == ctx.s2p.end());
239 
240         seqno_min = ctx.s2p.index_front();
241         seqno_max = ctx.s2p.index_back();
242     }
243 
244     /* What we have now is |111222444***|----| */
245     /* Reopening of the file should:
246      * 1) discard messages 1, 2 since there is a hole at 3. Only 4 should remain
247      * 2) trim the trailing unordered message
248      */
249     {
250         rb_ctx ctx(rb_5size);
251 
252         ck_assert_msg(ctx.rb.size() == ctx.size,
253                       "Expected %zd, got %zd", ctx.size, ctx.rb.size());
254 
255         if (ctx.gid != GID)
256         {
257             std::ostringstream os;
258             os << "Expected GID: " << GID << ", got: " << ctx.gid;
259             ck_abort_msg("%s", os.str().c_str());
260         }
261 
262         ck_assert(!ctx.s2p.empty());
263         ck_assert(ctx.s2p.size() == 1);
264         ck_assert(ctx.s2p.index_front() != seqno_min);
265         ck_assert(ctx.s2p.index_front() == seqno_max);
266 
267         void* m(ctx.add_msg(msgs[4]));
268         ck_assert(NULL != m);
269         ck_assert(*ctx.s2p.find(msgs[4].g) == m);
270 
271         m = ctx.add_msg(msgs[5]);
272         ck_assert(NULL != m);
273         ck_assert(msgs[5].g <= 0);
274         ck_assert(ctx.s2p.find(msgs[5].g) == ctx.s2p.end());
275 
276         m = ctx.add_msg(msgs[6]);
277         ck_assert(NULL != m);
278         ck_assert(*ctx.s2p.find(msgs[6].g) <= m);
279         // here we should have rollover
280         ck_assert(ptr2BH(m) == BH_cast(ctx.rb.start()));
281 
282         seqno_min = ctx.s2p.index_front();
283         seqno_max = ctx.s2p.index_back();
284     }
285 
286     /* What we have now is |555|---|444333***| */
287     /* Reopening of the file should:
288      * 1) discard discard unordered message at the end
289      * 2) continuous seqno interval should be now 3,4,5
290      */
291     {
292         rb_ctx ctx0(rb_5size);
293 
294         ck_assert_msg(ctx0.rb.size() == ctx0.size,
295                       "Expected %zd, got %zd", ctx0.size, ctx0.rb.size());
296 
297         if (ctx0.gid != GID)
298         {
299             std::ostringstream os;
300             os << "Expected GID: " << GID << ", got: " << ctx0.gid;
301             ck_abort_msg("%s", os.str().c_str());
302         }
303 
304         ck_assert(!ctx0.s2p.empty());
305         ck_assert(ctx0.s2p.size() == 3);
306         ck_assert(ctx0.s2p.index_front() == seqno_min);
307         ck_assert(ctx0.s2p.index_back()  == seqno_max);
308 
309         /* now try to open unclosed file. Results should be the same */
310         rb_ctx ctx(rb_5size);
311 
312         ck_assert_msg(ctx.rb.size() == ctx.size,
313                       "Expected %zd, got %zd", ctx.size, ctx.rb.size());
314 
315         if (ctx.gid != GID)
316         {
317             std::ostringstream os;
318             os << "Expected GID: " << GID << ", got: " << ctx.gid;
319             ck_abort_msg("%s", os.str().c_str());
320         }
321 
322         ck_assert(!ctx.s2p.empty());
323         ck_assert(ctx.s2p.size() == 3);
324         ck_assert(ctx.s2p.index_front() == seqno_min);
325         ck_assert(ctx.s2p.index_back()  == seqno_max);
326 
327         seqno_min = ctx.s2p.index_front();
328         seqno_max = ctx.s2p.index_back();
329     }
330 
331     size_t const rb_3size(msg_size*3);
332 
333     /* now try to truncate the buffer. Only seqno 4,5 should remain */
334     /* |555---444| */
335     {
336         rb_ctx ctx(rb_3size);
337 
338         ck_assert_msg(ctx.rb.size() == ctx.size,
339                       "Expected %zd, got %zd", ctx.size, ctx.rb.size());
340 
341         if (ctx.gid != GID)
342         {
343             std::ostringstream os;
344             os << "Expected GID: " << GID << ", got: " << ctx.gid;
345             ck_abort_msg("%s", os.str().c_str());
346         }
347 
348         ck_assert(!ctx.s2p.empty());
349         ck_assert(ctx.s2p.size() == 2);
350         ck_assert(ctx.s2p.index_begin() != seqno_min);
351         ck_assert(ctx.s2p.index_back()  == seqno_max);
352 
353         void* m(ctx.add_msg(msgs[8]));
354         ck_assert(NULL != m);
355         ck_assert(*ctx.s2p.find(msgs[8].g) == m);
356 
357         m = ctx.add_msg(msgs[9]);
358         ck_assert(NULL != m);
359         ck_assert(*ctx.s2p.find(msgs[9].g) == m);
360 
361         m = ctx.add_msg(msgs[7]);
362         ck_assert(NULL != m);
363         ck_assert(msgs[7].g <= 0);
364         ck_assert(ctx.s2p.find(msgs[7].g) == ctx.s2p.end());
365         // here we should have rollover
366         ck_assert(ptr2BH(m) == BH_cast(ctx.rb.start()));
367 
368         seqno_min = ctx.s2p.index_front();
369         seqno_max = ctx.s2p.index_back();
370     }
371 
372     /* what we should have now is |***---777| - only one segment, at the end */
373     {
374         /* first open this with known offset */
375         rb_ctx ctx0(rb_3size);
376 
377         ck_assert_msg(ctx0.rb.size() == ctx0.size,
378                       "Expected %zd, got %zd", ctx0.size, ctx0.rb.size());
379 
380         if (ctx0.gid != GID)
381         {
382             std::ostringstream os;
383             os << "Expected GID: " << GID << ", got: " << ctx0.gid;
384             ck_abort_msg("%s", os.str().c_str());
385         }
386 
387         ck_assert(!ctx0.s2p.empty());
388         ck_assert(ctx0.s2p.size() == 1);
389         ck_assert(ctx0.s2p.index_front() == seqno_max);
390         ck_assert(ctx0.s2p.index_back()  == seqno_max);
391 
392         /* now try to open unclosed file. Results should be the same */
393         rb_ctx ctx(rb_3size);
394 
395         ck_assert_msg(ctx.rb.size() == ctx.size,
396                       "Expected %zd, got %zd", ctx.size, ctx.rb.size());
397 
398         if (ctx.gid != GID)
399         {
400             std::ostringstream os;
401             os << "Expected GID: " << GID << ", got: " << ctx.gid;
402             ck_abort_msg("%s", os.str().c_str());
403         }
404 
405         ck_assert(!ctx.s2p.empty());
406         ck_assert(ctx.s2p.size() == 1);
407         ck_assert(ctx.s2p.index_front() == seqno_max);
408         ck_assert(ctx.s2p.index_back()  == seqno_max);
409 
410         ck_assert(seqno_max >= 1);
411         ck_assert(seqno_min == seqno_max);
412     }
413 
414     ::unlink(RB_NAME.c_str());
415 
416     /* test for singe segment in the middle */
417     ptrdiff_t third_buffer_offset(0);
418     {
419         rb_ctx ctx(rb_3size, false);
420 
421         ck_assert_msg(ctx.rb.size() == ctx.size,
422                       "Expected %zd, got %zd", ctx.size, ctx.rb.size());
423 
424         if (ctx.gid != GID)
425         {
426             std::ostringstream os;
427             os << "Expected GID: " << GID << ", got: " << ctx.gid;
428             ck_abort_msg("%s", os.str().c_str());
429         }
430 
431         ck_assert(ctx.s2p.empty());
432 
433         void* m(ctx.add_msg(msgs[3]));
434         ck_assert(NULL != m);
435         ck_assert(ctx.s2p.find(msgs[3].g) == ctx.s2p.end());
436 
437         m = ctx.add_msg(msgs[4]);
438         ck_assert(NULL != m);
439         ck_assert(*ctx.s2p.find(msgs[4].g) == m);
440 
441         m = ctx.add_msg(msgs[5]);
442         ck_assert(NULL != m);
443         ck_assert(ctx.s2p.find(msgs[5].g) == ctx.s2p.end());
444         third_buffer_offset = ctx.rb.offset(m);
445 
446         ck_assert(!ctx.s2p.empty());
447         ck_assert(ctx.s2p.size() == 1);
448         seqno_min = ctx.s2p.index_front();
449         seqno_max = ctx.s2p.index_back();
450         ck_assert(seqno_min == seqno_max);
451     }
452 
453     /* now the situation should be |***444***| - only one segment, in the middle,
454      * reopen the file with a known position */
455     {
456         rb_ctx ctx(rb_3size);
457 
458         ck_assert_msg(ctx.rb.size() == ctx.size,
459                       "Expected %zd, got %zd", ctx.size, ctx.rb.size());
460 
461         if (ctx.gid != GID)
462         {
463             std::ostringstream os;
464             os << "Expected GID: " << GID << ", got: " << ctx.gid;
465             ck_abort_msg("%s", os.str().c_str());
466         }
467 
468         ck_assert(!ctx.s2p.empty());
469         ck_assert(ctx.s2p.size() == 1);
470         ck_assert(seqno_min == ctx.s2p.index_begin());
471         ck_assert(seqno_max == ctx.s2p.index_back());
472         ck_assert(seqno_min == seqno_max);
473     }
474 
475     /* now the situation should be |---444---| - only one segment, in the middle,
476      * reopen the file a second time - to trigger a rollover bug */
477     {
478         rb_ctx ctx(rb_3size);
479 
480         ck_assert_msg(ctx.rb.size() == ctx.size,
481                       "Expected %zd, got %zd", ctx.size, ctx.rb.size());
482 
483         if (ctx.gid != GID)
484         {
485             std::ostringstream os;
486             os << "Expected GID: " << GID << ", got: " << ctx.gid;
487             ck_abort_msg("%s", os.str().c_str());
488         }
489 
490         ck_assert(!ctx.s2p.empty());
491         ck_assert(ctx.s2p.size() == 1);
492         ck_assert(seqno_min == ctx.s2p.index_front());
493         ck_assert(seqno_max == ctx.s2p.index_back());
494         ck_assert(seqno_min == seqno_max);
495 
496         // must be allocated right after the recovered buffer
497         void* m(ctx.add_msg(msgs[3]));
498         ck_assert(NULL != m);
499         ck_assert(third_buffer_offset == ctx.rb.offset(m));
500     }
501 
502     ::unlink(RB_NAME.c_str());
503 }
504 END_TEST
505 
506 
gcache_rb_suite()507 Suite* gcache_rb_suite()
508 {
509     Suite* ts = suite_create("gcache::RbStore");
510     TCase* tc = tcase_create("test");
511 
512     tcase_set_timeout(tc, 60);
513     tcase_add_test(tc, test1);
514     suite_add_tcase(ts, tc);
515 
516     tc = tcase_create("recovery");
517 
518     tcase_set_timeout(tc, 60);
519     tcase_add_test(tc, recovery);
520     suite_add_tcase(ts, tc);
521 
522     return ts;
523 }
524