1 /* Copyright (c) 2019-2020, Codership Oy. All rights reserved.
2  *
3  * This program is free software; you can redistribute it and/or modify
4  * it under the terms of the GNU General Public License as published by
5  * the Free Software Foundation; version 2 of the License.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
15  */
16 
17 #include "store.h"
18 
19 #include "log.h"
20 
21 #include <assert.h>
22 #include <errno.h>
23 #include <pthread.h>
24 #include <stdbool.h>
25 #include <stddef.h>   // ptrdiff_t
26 #include <stdint.h>   // uintptr_t
27 #include <stdlib.h>   // abort()
28 #include <string.h>   // memset()
29 
30 #define DECLARE_SERIALIZE_INT(INTTYPE)                                  \
31     static inline size_t                                                \
32     store_serialize_##INTTYPE(void* const to, INTTYPE##_t const from)   \
33     {                                                                   \
34         memcpy(to, &from, sizeof(from)); /* for simplicity ignore endianness */ \
35         return sizeof(from);                                            \
36     }
37 
38 DECLARE_SERIALIZE_INT(uint32);
39 DECLARE_SERIALIZE_INT(int64);
40 
41 #define DECLARE_DESERIALIZE_INT(INTTYPE)                                \
42     static inline size_t                                                \
43     store_deserialize_##INTTYPE(INTTYPE##_t* const to, const void* const from) \
44     {                                                                   \
45         memcpy(to, from, sizeof(*to)); /* for simplicity ignore endianness */ \
46         return sizeof(*to);                                             \
47     }
48 
49 DECLARE_DESERIALIZE_INT(uint32);
50 DECLARE_DESERIALIZE_INT(int64);
51 
52 typedef struct record
53 {
54     wsrep_seqno_t version;
55     uint32_t      value;
56     /* this order ensures that there is no padding between the members */
57 }
58 record_t;
59 
60 #define STORE_RECORD_SIZE \
61     (sizeof(((record_t*)(NULL))->version) + sizeof(((record_t*)(NULL))->value))
62 
63 static inline size_t
store_record_set(void * const base,size_t const index,const record_t * const record)64 store_record_set(void*           const base,
65                  size_t          const index,
66                  const record_t* const record)
67 {
68     char* const position = (char*)base + index*STORE_RECORD_SIZE;
69     memcpy(position, record, STORE_RECORD_SIZE);
70     return STORE_RECORD_SIZE;
71 }
72 
73 static inline size_t
store_record_get(const void * const base,size_t const index,record_t * const record)74 store_record_get(const void*     const base,
75                  size_t          const index,
76                  record_t*       const record)
77 {
78     const char* const position = (const char*)base + index*STORE_RECORD_SIZE;
79     memcpy(record, position, STORE_RECORD_SIZE);
80     return STORE_RECORD_SIZE;
81 }
82 
83 static inline bool
store_record_equal(const record_t * const lhs,const record_t * const rhs)84 store_record_equal(const record_t* const lhs, const record_t* const rhs)
85 {
86     return (lhs->version == rhs->version) && (lhs->value == rhs->value);
87 }
88 
89 /* transaction context */
90 struct store_trx_op
91 {
92     /* Normally what we'd need for transaction context is the record index and
93      * new record value. Here we also save read view snapshot (rec_from & rec_to)
94      * to
95      * 1. test provider certification correctness if provider supports read view
96      * 2. if not, detect conflicts at a store level. */
97     record_t rec_from;
98     record_t rec_to;
99     uint32_t idx_from;
100     uint32_t idx_to;
101     uint32_t new_value;
102     uint32_t size; /* nominal "size" of operation to manipulate on-the-wire
103                     * writeset size. */
104 };
105 
106 #define STORE_OP_SIZE (STORE_RECORD_SIZE + STORE_RECORD_SIZE +           \
107                        sizeof(((struct store_trx_op*)NULL)->idx_from) +  \
108                        sizeof(((struct store_trx_op*)NULL)->idx_to) +    \
109                        sizeof(((struct store_trx_op*)NULL)->new_value) + \
110                        sizeof(((struct store_trx_op*)NULL)->size))
111 
112 struct store_trx_ctx
113 {
114     wsrep_gtid_t         rv_gtid;
115     size_t               ops_num;
116     struct store_trx_op* ops;
117 };
118 
119 static inline bool
store_trx_add_op(struct store_trx_ctx * const trx)120 store_trx_add_op(struct store_trx_ctx* const trx)
121 {
122     struct store_trx_op* const new_ops =
123         realloc(trx->ops, sizeof(struct store_trx_op)*(trx->ops_num + 1));
124 
125     if (new_ops)
126     {
127         trx->ops = new_ops;
128 #ifndef NDEBUG
129         memset(&trx->ops[trx->ops_num], 0, sizeof(*trx->ops));
130 #endif
131         trx->ops_num++;
132     }
133 
134     return (NULL == new_ops);
135 }
136 
137 struct store_trx_entry
138 {
139     bool                 used;
140     struct store_trx_ctx ctx;
141 };
142 
143 typedef wsrep_uuid_t member_t;
144 
145 struct node_store
146 {
147     wsrep_gtid_t    gtid;
148     pthread_mutex_t gtid_mtx;
149     wsrep_trx_id_t  trx_id;
150     pthread_mutex_t trx_id_mtx;
151     char*           snapshot;
152     member_t*       members;
153     void*           records;
154     size_t          op_size;
155     long            read_view_fails;
156     uint32_t        members_num;
157     uint32_t        records_num;
158     uint32_t        entries_mask;
159     bool            read_view_support; // read view support by cluster
160     /* trx pool piggybacked */
161 };
162 
163 node_store_t*
node_store_open(const struct node_options * const opts)164 node_store_open(const struct node_options* const opts)
165 {
166     /* make the size of trx pool the next highest power of 2 over the total
167      * number of workers */
168     uint32_t trx_pool_mask = (uint32_t)(opts->masters + opts->slaves);
169     if (trx_pool_mask > 0)
170     {
171         trx_pool_mask -= 1;
172         trx_pool_mask |= trx_pool_mask >> 1;
173         trx_pool_mask |= trx_pool_mask >> 2;
174         trx_pool_mask |= trx_pool_mask >> 4;
175         trx_pool_mask |= trx_pool_mask >> 8;
176         trx_pool_mask |= trx_pool_mask >> 16;
177     }
178     assert(((trx_pool_mask + 1) & trx_pool_mask) == 0); // 2^n - 1
179 
180     size_t const desired_op_size = (size_t)(opts->ws_size/opts->operations);
181     size_t const op_size = (desired_op_size > STORE_OP_SIZE ?
182                             desired_op_size : STORE_OP_SIZE);
183 
184     /* since the number of workers will never change, we can allocate trx pool
185      * together with the main store struc */
186     size_t const store_alloc_size = sizeof(struct node_store) +
187         /* op_size - additional buffer for op serialization per trx */
188         (sizeof(struct store_trx_entry) + op_size)*(trx_pool_mask + 1);
189 
190     struct node_store* const ret = malloc(store_alloc_size);
191 
192     if (ret)
193     {
194         memset(ret, 0, store_alloc_size);
195         ret->records = malloc((size_t)opts->records * STORE_RECORD_SIZE);
196 
197         if (ret->records)
198         {
199             ret->gtid = WSREP_GTID_UNDEFINED;
200             pthread_mutex_init(&ret->gtid_mtx, NULL);
201             pthread_mutex_init(&ret->trx_id_mtx, NULL);
202             ret->op_size      = op_size;
203             ret->records_num  = (uint32_t)opts->records;
204             ret->entries_mask = trx_pool_mask;
205 
206             uint32_t i;
207             for (i = 0; i < ret->records_num; i++)
208             {
209                 /* keep state in serialized form for easy snapshotting */
210                 struct record const record = { WSREP_SEQNO_UNDEFINED, i };
211                 store_record_set(ret->records, i, &record);
212             }
213 
214             return ret;
215         }
216         else
217         {
218             free(ret);
219         }
220     }
221 
222     return NULL;
223 }
224 
225 void
node_store_close(struct node_store * const store)226 node_store_close(struct node_store* const store)
227 {
228     assert(store);
229     assert(store->records);
230     pthread_mutex_destroy(&store->gtid_mtx);
231     pthread_mutex_destroy(&store->trx_id_mtx);
232     free(store->records);
233     free(store->members);
234     free(store);
235 }
236 
237 #define STORE_MUTEX_LOCK(mtx)                              \
238     {                                                      \
239         int err = pthread_mutex_lock(mtx);                 \
240         if (err)                                           \
241         {                                                  \
242             NODE_FATAL("Failed to lock " #mtx ": %d (%s)", \
243                        err, strerror(err));                \
244             abort();                                       \
245         }                                                  \
246     }
247 
248 static inline struct store_trx_entry*
store_get_trx_entry(struct node_store * const store,wsrep_trx_id_t const trx_id)249 store_get_trx_entry(struct node_store* const store, wsrep_trx_id_t const trx_id)
250 {
251     return (struct store_trx_entry*)
252         ((char*)(store + 1) + (trx_id & store->entries_mask)*
253          (sizeof(struct store_trx_entry) + store->op_size));
254 }
255 
256 static inline struct store_trx_ctx*
store_get_trx_ctx(struct node_store * const store,wsrep_trx_id_t const trx_id)257 store_get_trx_ctx(struct node_store* const store, wsrep_trx_id_t const trx_id)
258 {
259     return &(store_get_trx_entry(store, trx_id)->ctx);
260 }
261 
262 static inline wsrep_trx_id_t
store_new_trx_id(struct node_store * const store)263 store_new_trx_id(struct node_store* const store)
264 {
265     wsrep_trx_id_t ret;
266     struct store_trx_entry* trx;
267 
268     STORE_MUTEX_LOCK(&store->trx_id_mtx);
269 
270     do
271     {
272         store->trx_id++;
273         trx = store_get_trx_entry(store, store->trx_id);
274     }
275     while (trx->used);
276     trx->used = true;
277     ret = store->trx_id;
278 
279     pthread_mutex_unlock(&store->trx_id_mtx);
280 
281     memset(&trx->ctx, 0, sizeof(trx->ctx));
282 
283     return ret;
284 }
285 
286 static inline void
store_free_trx_id(struct node_store * const store,wsrep_trx_id_t const trx_id)287 store_free_trx_id(struct node_store* const store, wsrep_trx_id_t const trx_id)
288 {
289     struct store_trx_entry* const trx = store_get_trx_entry(store, trx_id);
290     assert(trx->used);
291     free(trx->ctx.ops);
292 
293     STORE_MUTEX_LOCK(&store->trx_id_mtx);
294 
295     trx->used = false;
296 
297     pthread_mutex_unlock(&store->trx_id_mtx);
298 }
299 
300 /**
301  * deserializes membership from snapshot */
302 static int
store_new_members(const char * ptr,const char * const endptr,uint32_t * const num,member_t ** const memb)303 store_new_members(const char* ptr, const char* const endptr,
304                   uint32_t* const num, member_t** const memb)
305 {
306     ptr += store_deserialize_uint32(num, ptr);
307 
308     if (*num < 2)
309     {
310         NODE_ERROR("Bogus number of members %u", *num);
311         return -1;
312     }
313 
314     int ret = (int)sizeof(*num);
315 
316     size_t const msize = sizeof(member_t) * *num;
317     if ((endptr - ptr) < (ptrdiff_t)msize)
318     {
319         NODE_ERROR("State snapshot does not contain all membership: "
320                    "%zd < %zu", endptr - ptr, msize);
321         return -1;
322     }
323 
324     *memb = calloc(*num, sizeof(member_t));
325     if (!*memb)
326     {
327         NODE_ERROR("Could not allocate new membership");
328         return -ENOMEM;
329     }
330 
331     memcpy(*memb, ptr, msize);
332 
333     return ret + (int)msize;
334 }
335 
336 /**
337  * deserializes records from snapshot */
338 static int
store_new_records(const char * ptr,const char * const endptr,uint32_t * const num,void ** const rec)339 store_new_records(const char* ptr, const char* const endptr,
340                   uint32_t* const num, void** const rec)
341 {
342     ptr += store_deserialize_uint32(num, ptr);
343 
344     int ret = (int)sizeof(*num);
345     if (!*num)
346     {
347         *rec = NULL;
348         return ret;
349     }
350 
351     size_t const rsize = STORE_RECORD_SIZE * *num;
352     if ((endptr - ptr) < (ptrdiff_t)rsize)
353     {
354         NODE_ERROR("State snapshot does not contain all records: "
355                    "%zu < %zu", endptr - ptr, rsize);
356         return -1;
357     }
358 
359     *rec = malloc(rsize);
360     if (!*rec)
361     {
362         NODE_ERROR("Could not allocate new records");
363         return -ENOMEM;
364     }
365 
366     memcpy(*rec, ptr, rsize);
367 
368     return ret + (int)rsize;
369 }
370 
371 int
node_store_init_state(struct node_store * const store,const void * const state,size_t const state_len)372 node_store_init_state(struct node_store*  const store,
373                       const void*         const state,
374                       size_t              const state_len)
375 {
376     /* First, deserialize and prepare new state */
377     if (state_len <= sizeof(member_t)*2 /* at least two members */ +
378         WSREP_UUID_STR_LEN + 1 /* : */ + 1 /* seqno */ + 1 /* \0 */)
379     {
380         NODE_ERROR("State snapshot too short: %zu", state_len);
381         return -1;
382     }
383 
384     wsrep_gtid_t state_gtid;
385     int ret;
386     ret = wsrep_gtid_scan(state, state_len, &state_gtid);
387     if (ret < 0)
388     {
389         char state_str[WSREP_GTID_STR_LEN + 1] = { 0, };
390         memcpy(state_str, state, sizeof(state_str) - 1);
391         NODE_ERROR("Could not find valid GTID in the received data: %s",
392                     state_str);
393         return -1;
394     }
395 
396     ret++; /* \0 */
397     if ((state_len - (size_t)ret) < sizeof(uint32_t))
398     {
399         NODE_ERROR("State snapshot does not contain the number of members");
400         return -1;
401     }
402 
403     const char* ptr = ((char*)state);
404     const char* const endptr = ptr + state_len;
405     ptr += ret;
406 
407     uint32_t m_num;
408     member_t* new_members;
409     ret = store_new_members(ptr, endptr, &m_num, &new_members);
410     if (ret < 0)
411     {
412         return ret;
413     }
414     ptr += ret;
415 
416     bool const read_view_support = ptr[0];
417     ptr += 1;
418 
419     uint32_t r_num;
420     void* new_records;
421     ret = store_new_records(ptr, endptr, &r_num, &new_records);
422     if (ret < 0)
423     {
424         free(new_members);
425         return ret;
426     }
427     ptr += ret;
428 
429     STORE_MUTEX_LOCK(&store->gtid_mtx);
430 
431     /* just a sanity check */
432     if (0 == wsrep_uuid_compare(&state_gtid.uuid, &store->gtid.uuid) &&
433         state_gtid.seqno < store->gtid.seqno)
434     {
435         NODE_ERROR("Received snapshot that is in the past: my seqno %lld,"
436                    " received seqno: %lld",
437                    (long long)store->gtid.seqno, (long long)state_gtid.seqno);
438         free(new_members);
439         free(new_records);
440         ret = -1;
441     }
442     else
443     {
444         free(store->members);
445         store->members_num = m_num;
446         store->members     = new_members;
447         free(store->records);
448         store->records_num = r_num;
449         store->records     = new_records;
450         store->gtid        = state_gtid;
451         store->read_view_support = read_view_support;
452         ret = 0;
453     }
454 
455     pthread_mutex_unlock(&store->gtid_mtx);
456 
457     return ret;
458 }
459 
460 int
node_store_acquire_state(node_store_t * const store,const void ** const state,size_t * const state_len)461 node_store_acquire_state(node_store_t* const store,
462                          const void**  const state,
463                          size_t*       const state_len)
464 {
465     int ret = 0;
466 
467     STORE_MUTEX_LOCK(&store->gtid_mtx);
468 
469     if (!store->snapshot)
470     {
471         size_t const memb_len = store->members_num * sizeof(member_t);
472         size_t const rec_len  = store->records_num * STORE_RECORD_SIZE;
473         size_t const buf_len  = WSREP_GTID_STR_LEN + 1
474             + sizeof(uint32_t) + memb_len
475             + 1 /* read view support */
476             + sizeof(uint32_t) + rec_len;
477 
478         store->snapshot = malloc(buf_len);
479 
480         if (store->snapshot)
481         {
482             char* ptr = store->snapshot;
483 
484             /* state GTID */
485             ret = wsrep_gtid_print(&store->gtid, ptr, buf_len);
486             if (ret > 0)
487             {
488                 NODE_INFO("");
489                 assert((size_t)ret < buf_len);
490 
491                 ptr[ret] = '\0';
492                 ret++;
493                 ptr += ret;
494                 assert((size_t)ret < buf_len);
495 
496                 /* membership */
497                 ptr += store_serialize_uint32(ptr, store->members_num);
498                 ret += (int)sizeof(uint32_t);
499                 assert((size_t)ret + memb_len < buf_len);
500                 memcpy(ptr, store->members, memb_len);
501                 ptr += memb_len;
502                 ret += (int)memb_len;
503                 assert((size_t)ret + sizeof(uint32_t) <= buf_len);
504 
505                 /* read view support */
506                 ptr[0] = store->read_view_support;
507                 ptr += 1;
508                 ret += 1;
509 
510                 /* records */
511                 ptr += store_serialize_uint32(ptr, store->records_num);
512                 ret += (int)sizeof(uint32_t);
513                 assert((size_t)ret + rec_len < buf_len);
514                 memcpy(ptr, store->records, rec_len);
515                 ret += (int)rec_len;
516                 assert((size_t)ret <= buf_len);
517             }
518             else
519             {
520                 NODE_ERROR("Failed to record GTID: %d (%s)", ret,strerror(-ret));
521                 free(store->snapshot);
522                 store->snapshot = 0;
523             }
524         }
525         else
526         {
527             NODE_ERROR("Failed to allocate snapshot buffer of size %zu",buf_len);
528             ret = -ENOMEM;
529         }
530     }
531     else
532     {
533         assert(0); /* provider should prevent such situation */
534         ret = -EAGAIN;
535     }
536 
537     pthread_mutex_unlock(&store->gtid_mtx);
538 
539     if (ret > 0)
540     {
541         NODE_INFO("\n\nPrepared snapshot of %u records\n\n", store->records_num);
542         *state     = store->snapshot;
543         *state_len = (size_t)ret;
544         ret        = 0;
545     }
546 
547     return ret;
548 }
549 
550 void
node_store_release_state(node_store_t * const store)551 node_store_release_state(node_store_t* const store)
552 {
553     STORE_MUTEX_LOCK(&store->gtid_mtx);
554 
555     assert(store->snapshot);
556     free(store->snapshot);
557     store->snapshot = 0;
558 
559     pthread_mutex_unlock(&store->gtid_mtx);
560 }
561 
562 int
node_store_update_membership(struct node_store * const store,const wsrep_view_info_t * const v)563 node_store_update_membership(struct node_store*       const store,
564                              const wsrep_view_info_t* const v)
565 {
566     assert(store);
567     assert(WSREP_VIEW_PRIMARY == v->status);
568         assert(v->memb_num > 0);
569 
570     STORE_MUTEX_LOCK(&store->gtid_mtx);
571 
572     bool const continuation = v->state_id.seqno == store->gtid.seqno + 1 &&
573         0 == wsrep_uuid_compare(&v->state_id.uuid, &store->gtid.uuid);
574 
575     bool const initialization = WSREP_SEQNO_UNDEFINED == store->gtid.seqno &&
576         0 == wsrep_uuid_compare(&WSREP_UUID_UNDEFINED, &store->gtid.uuid);
577 
578     if (!(continuation || initialization))
579     {
580         char store_str[WSREP_GTID_STR_LEN + 1] = { 0, };
581         wsrep_gtid_print(&store->gtid, store_str, sizeof(store_str));
582         char view_str[WSREP_GTID_STR_LEN + 1] = { 0, };
583         wsrep_gtid_print(&v->state_id, view_str, sizeof(view_str));
584 
585         NODE_FATAL("Attempt to initialize store GTID from incompatible view:\n"
586                    "\tstore: %s\n"
587                    "\tview:  %s",
588                    store_str, view_str);
589         abort();
590     }
591 
592     wsrep_uuid_t* const new_members = calloc(sizeof(wsrep_uuid_t),
593                                              (size_t)v->memb_num);
594     if (!new_members)
595     {
596         NODE_FATAL("Could not allocate new members array");
597         abort();
598     }
599 
600     int i;
601     for (i = 0; i < v->memb_num; i++)
602     {
603         new_members[i] = v->members[i].id;
604     }
605 
606     /* REPLICATION: at this point we should compare old and new memberships and
607      *              rollback all streaming transactions from the partitioned
608      *              members, if any. But we don't support it in this program yet.
609      */
610 
611     free(store->members);
612 
613     store->members     = new_members;
614     store->members_num = (uint32_t)v->memb_num;
615     store->gtid        = v->state_id;
616     store->read_view_support = (v->capabilities & WSREP_CAP_SNAPSHOT);
617 
618     pthread_mutex_unlock(&store->gtid_mtx);
619 
620     return 0;
621 }
622 
623 void
node_store_gtid(struct node_store * const store,wsrep_gtid_t * const gtid)624 node_store_gtid(struct node_store* const store,
625                 wsrep_gtid_t*      const gtid)
626 {
627     assert(store);
628 
629     STORE_MUTEX_LOCK(&store->gtid_mtx);
630 
631     *gtid = store->gtid;
632 
633     pthread_mutex_unlock(&store->gtid_mtx);
634 }
635 
636 
637 static inline void
store_serialize_op(void * const buf,const struct store_trx_op * const op)638 store_serialize_op(void* const buf, const struct store_trx_op* const op)
639 {
640     char* ptr = buf;
641     ptr += store_record_set(ptr, 0, &op->rec_from);
642     ptr += store_record_set(ptr, 0, &op->rec_to);
643     ptr += store_serialize_uint32(ptr, op->idx_from);
644     ptr += store_serialize_uint32(ptr, op->idx_to);
645     ptr += store_serialize_uint32(ptr, op->new_value);
646     store_serialize_uint32(ptr, op->size);
647 }
648 
649 static inline void
store_deserialize_op(struct store_trx_op * const op,const void * const buf)650 store_deserialize_op(struct store_trx_op* const op, const void* const buf)
651 {
652     const char* ptr = buf;
653     ptr += store_record_get(ptr, 0, &op->rec_from);
654     ptr += store_record_get(ptr, 0, &op->rec_to);
655     ptr += store_deserialize_uint32(&op->idx_from, ptr);
656     ptr += store_deserialize_uint32(&op->idx_to, ptr);
657     ptr += store_deserialize_uint32(&op->new_value, ptr);
658     store_deserialize_uint32(&op->size, ptr);
659 }
660 
661 static inline void
store_serialize_gtid(void * const buf,const wsrep_gtid_t * const gtid)662 store_serialize_gtid(void* const buf, const wsrep_gtid_t* const gtid)
663 {
664     char* ptr = buf;
665     memcpy(ptr, &gtid->uuid, sizeof(gtid->uuid));
666     ptr += sizeof(gtid->uuid);
667     store_serialize_int64(ptr, gtid->seqno);
668 }
669 
670 static inline void
store_deserialize_gtid(wsrep_gtid_t * const gtid,const void * const buf)671 store_deserialize_gtid(wsrep_gtid_t* const gtid, const void* const buf)
672 {
673     const char* ptr = buf;
674     memcpy(&gtid->uuid, ptr, sizeof(gtid->uuid));
675     ptr += sizeof(gtid->uuid);
676     store_deserialize_int64(&gtid->seqno, ptr);
677 }
678 
679 #define STORE_GTID_SIZE (sizeof(((wsrep_gtid_t*)(NULL))->uuid) + sizeof(int64_t))
680 
681 int
node_store_execute(node_store_t * const store,wsrep_t * const wsrep,wsrep_ws_handle_t * const ws_handle)682 node_store_execute(node_store_t*      const store,
683                    wsrep_t*           const wsrep,
684                    wsrep_ws_handle_t* const ws_handle)
685 {
686     assert(store);
687 
688     if (0 == ws_handle->trx_id)
689     {
690         assert(sizeof(ws_handle->trx_id) >= sizeof(uintptr_t));
691         ws_handle->trx_id = store_new_trx_id(store);
692     }
693 
694     struct store_trx_ctx* trx = store_get_trx_ctx(store, ws_handle->trx_id);
695     if (store_trx_add_op(trx)) return -ENOMEM;
696     struct store_trx_op* const op = &trx->ops[trx->ops_num - 1];
697 
698     STORE_MUTEX_LOCK(&store->gtid_mtx);
699 
700     if (1 == trx->ops_num)
701     {
702         /* First operation, save ID of the read view of the transaction */
703         trx->rv_gtid = store->gtid;
704     }
705 
706     /* Transaction op: copy value from one random record to another... */
707     op->idx_from = (uint32_t)rand() % store->records_num;
708     op->idx_to   = (uint32_t)rand() % store->records_num;
709     store_record_get(store->records, op->idx_from, &op->rec_from);
710     store_record_get(store->records, op->idx_to,   &op->rec_to);
711 
712     pthread_mutex_unlock(&store->gtid_mtx);
713 
714     wsrep_status_t ret = WSREP_TRX_FAIL;
715 
716     if (op->rec_from.version > trx->rv_gtid.seqno ||
717         op->rec_to.version   > trx->rv_gtid.seqno)
718     {
719         /* transaction read view changed, trx needs to be restarted */
720 #if 0
721         NODE_INFO("Transaction read view changed: %lld -> %lld, returning %d",
722                   (long long)trx->rv_gtid.seqno,
723                   (long long)(op->rec_from.version > op->rec_to.version ?
724                               op->rec_from.version : op->rec_to.version),
725                   ret);
726 #endif
727         goto error;
728     }
729 
730     /* Transaction op: ... and modify it somehow, e.g. increment by 1 */
731     op->new_value = op->rec_from.value + 1;
732 
733     if (1 == trx->ops_num) // first trx operation
734     {
735         /* REPLICATION: Since this application does not implement record locks,
736          *              it needs to establish read view for each transaction for
737          *              a proper conflict detection and transaction isolation.
738          *              Otherwose we'll need to implement record versioning */
739         if (store->read_view_support)
740         {
741             ret = wsrep->assign_read_view(wsrep, ws_handle, &trx->rv_gtid);
742             if (ret)
743             {
744                 NODE_ERROR("wsrep::assign_read_view(%lld) failed: %d",
745                            trx->rv_gtid.seqno, ret);
746                 goto error;
747             }
748         }
749 
750         /* Record read view in the writeset for debugging purposes */
751         assert(store->op_size > STORE_GTID_SIZE);
752         store_serialize_gtid(trx + 1, &trx->rv_gtid);
753         wsrep_buf_t ws = { .ptr = trx + 1, .len = STORE_GTID_SIZE };
754         ret = wsrep->append_data(wsrep, ws_handle, &ws, 1, WSREP_DATA_ORDERED,
755                                  true);
756         if (ret)
757         {
758             NODE_ERROR("wsrep::append_data(rv_gtid) failed: %d", ret);
759             goto error;
760         }
761     }
762 
763     /* REPLICATION: append keys touched by the operation
764      *
765      * NOTE: depending on data access granularity some applications may require
766      *       multipart keys, e.g. <schema>:<table>:<row> in a SQL database.
767      *       Single part keys match hashtables and key-value stores.
768      *       Below we have two different single-part keys which reference two
769      *       different records. */
770     uint32_t    key_val;
771     wsrep_buf_t key_part = { .ptr = &key_val, .len = sizeof(key_val) };
772     wsrep_key_t ws_key   = { .key_parts = &key_part, .key_parts_num = 1 };
773 
774     /* REPLICATION: Key 1 - the key of the source, unchanged record */
775     store_serialize_uint32(&key_val, op->idx_from);
776     ret = wsrep->append_key(wsrep, ws_handle,
777                             &ws_key,
778                             1,   /* single key */
779                             WSREP_KEY_REFERENCE,
780                             true /* provider shall make a copy of the key */);
781     if (ret)
782     {
783         NODE_ERROR("wsrep::append_key(REFERENCE) failed: %d", ret);
784         goto error;
785     }
786 
787     /* REPLICATION: Key 2 - the key of the record we want to update */
788     store_serialize_uint32(&key_val, op->idx_to);
789     ret = wsrep->append_key(wsrep, ws_handle,
790                             &ws_key,
791                             1,   /* single key */
792                             WSREP_KEY_UPDATE,
793                             true /* provider shall make a copy of the key */);
794     if (ret)
795     {
796         NODE_ERROR("wsrep::append_key(UPDATE) failed: %d", ret);
797         goto error;
798     }
799 
800     /* REPLICATION: append transaction operation to the "writeset"
801      *              (WS buffer was allocated together with trx context above) */
802     assert(store->op_size >= STORE_OP_SIZE);
803     assert(store->op_size == (uint32_t)store->op_size);
804     op->size = (uint32_t)store->op_size;
805     store_serialize_op(trx + 1, op);
806     wsrep_buf_t ws = { .ptr = trx + 1, .len = store->op_size };
807     ret = wsrep->append_data(wsrep, ws_handle, &ws, 1, WSREP_DATA_ORDERED, true);
808 
809     if (!ret) return 0;
810 
811     NODE_ERROR("wsrep::append_data(op) failed: %d", ret);
812 
813 error:
814     store_free_trx_id(store, ws_handle->trx_id);
815 
816     return ret;
817 }
818 
819 int
node_store_apply(node_store_t * const store,wsrep_trx_id_t * const trx_id,const wsrep_buf_t * const ws)820 node_store_apply(node_store_t*      const store,
821                  wsrep_trx_id_t*    const trx_id,
822                  const wsrep_buf_t* const ws)
823 {
824     assert(store);
825     (void)store;
826 
827     *trx_id = store_new_trx_id(store);
828     struct store_trx_ctx* const trx = store_get_trx_ctx(store, *trx_id);
829 
830     /* prepare trx context for commit */
831     const char* ptr = ws->ptr;
832     size_t left     = ws->len;
833 
834     /* at least one operation should be there */
835     assert(left >= STORE_GTID_SIZE + STORE_OP_SIZE);
836 
837     if (left >= STORE_GTID_SIZE)
838     {
839         store_deserialize_gtid(&trx->rv_gtid, ptr);
840         left -= STORE_GTID_SIZE;
841         ptr  += STORE_GTID_SIZE;
842     }
843 
844     while (left >= STORE_OP_SIZE)
845     {
846         if (store_trx_add_op(trx))
847         {
848             store_free_trx_id(store,*trx_id); /* "rollback": release resources */
849             return -ENOMEM;
850         }
851         struct store_trx_op* const op = &trx->ops[trx->ops_num - 1];
852 
853         store_deserialize_op(op, ptr);
854         assert(op->idx_to <= store->records_num);
855 
856         left -= op->size;
857         ptr  += op->size;
858     }
859 
860     if (left != 0)
861     {
862         NODE_FATAL("Failed to process last (%d/%zu) bytes of the writeset.",
863                    (int)left, ws->len);
864         abort();
865     }
866 
867     return 0;
868 }
869 
870 static uint32_t const store_fnv32_seed  = 2166136261;
871 
872 static inline uint32_t
store_fnv32a(const void * buf,size_t const len,uint32_t seed)873 store_fnv32a(const void* buf, size_t const len, uint32_t seed)
874 {
875     static uint32_t const fnv32_prime = 16777619;
876     const uint8_t* bp = (const uint8_t*)buf;
877     const uint8_t* const be = bp + len;
878 
879     while (bp < be)
880     {
881         seed ^= *bp++;
882         seed *= fnv32_prime;
883     }
884 
885     return seed;
886 }
887 
888 
889 static void
store_checksum_state(node_store_t * store)890 store_checksum_state(node_store_t* store)
891 {
892     uint32_t res = store_fnv32_seed;
893     uint32_t i;
894 
895     for (i = 0; i < store->members_num; i++)
896     {
897         res = store_fnv32a(&store->members[i], sizeof(*store->members), res);
898     }
899 
900     res = store_fnv32a(store->records, store->records_num * STORE_RECORD_SIZE,
901                        res);
902 
903     res = store_fnv32a(&store->gtid.uuid, sizeof(store->gtid.uuid), res);
904 
905     wsrep_seqno_t s;
906     store_serialize_int64(&s, store->gtid.seqno);
907     res = store_fnv32a(&s, sizeof(s), res);
908 
909     NODE_INFO("\n\n\tSeqno: %lld; state hash: %#010x\n",
910               (long long)store->gtid.seqno, res);
911 }
912 
913 static inline void
store_update_gtid(node_store_t * const store,const wsrep_gtid_t * ws_gtid)914 store_update_gtid(node_store_t* const store, const wsrep_gtid_t* ws_gtid)
915 {
916     assert(0 == wsrep_uuid_compare(&store->gtid.uuid, &ws_gtid->uuid));
917 
918     store->gtid.seqno++;
919 
920     if (store->gtid.seqno != ws_gtid->seqno)
921     {
922         NODE_FATAL("Out of order commit: expected %lld, got %lld",
923                    store->gtid.seqno, ws_gtid->seqno);
924         abort();
925     }
926 
927     static wsrep_seqno_t const period = 0x000fffff; /* ~1M */
928     if (0 == (store->gtid.seqno & period))
929     {
930         store_checksum_state(store);
931     }
932 }
933 
934 void
node_store_commit(node_store_t * const store,wsrep_trx_id_t const trx_id,const wsrep_gtid_t * const ws_gtid)935 node_store_commit(node_store_t*       const store,
936                   wsrep_trx_id_t      const trx_id,
937                   const wsrep_gtid_t* const ws_gtid)
938 {
939     assert(store);
940     assert(trx_id);
941 
942     struct store_trx_ctx* const trx = store_get_trx_ctx(store, trx_id);
943 
944     bool const check_read_view_snapshot =
945 #ifdef NDEBUG
946         !store->read_view_support;
947 #else
948     1;
949 #endif /* NDEBUG */
950 
951     STORE_MUTEX_LOCK(&store->gtid_mtx);
952 
953     store_update_gtid(store, ws_gtid);
954 
955     /* First loop is to check if we can commit all operations if provider
956      * does not support read view or for debugging puposes */
957     size_t i;
958     if (check_read_view_snapshot)
959     {
960         for (i = 0; i < trx->ops_num; i++)
961         {
962             struct store_trx_op* const op = &trx->ops[i];
963 
964             record_t from, to;
965             store_record_get(store->records, op->idx_from, &from);
966             store_record_get(store->records, op->idx_to,   &to);
967 
968             if (!store_record_equal(&op->rec_from, &from) ||
969                 !store_record_equal(&op->rec_to,   &to))
970             {
971                 /* read view changed since transaction was executed,
972                  * can't commit */
973                 assert(op->rec_from.version <= from.version);
974                 assert(op->rec_to.version <= to.version);
975                 if (op->rec_from.version == from.version)
976                     assert(op->rec_from.value == from.value);
977                 if (op->rec_to.version == to.version)
978                     assert(op->rec_to.value == to.value);
979                 if (store->read_view_support) abort();
980 
981                 store->read_view_fails++;
982 
983                 NODE_INFO("Read view changed at commit time, rollback trx");
984 
985                 goto error;
986             }
987         }
988     }
989 
990     /* Second loop is to actually modify the dataset */
991     for (i = 0; i < trx->ops_num; i++)
992     {
993         struct store_trx_op* const op = &trx->ops[i];
994 
995         record_t const new_record =
996             { .version = ws_gtid->seqno, .value = op->new_value };
997 
998         store_record_set(store->records, op->idx_to, &new_record);
999     }
1000 
1001 error:
1002     pthread_mutex_unlock(&store->gtid_mtx);
1003 
1004     store_free_trx_id(store, trx_id);
1005 }
1006 
1007 void
node_store_rollback(node_store_t * const store,wsrep_trx_id_t const trx_id)1008 node_store_rollback(node_store_t*  const store,
1009                     wsrep_trx_id_t const trx_id)
1010 {
1011     assert(store);
1012     assert(trx_id);
1013 
1014     store_free_trx_id(store, trx_id);
1015 }
1016 
1017 void
node_store_update_gtid(node_store_t * const store,const wsrep_gtid_t * const ws_gtid)1018 node_store_update_gtid(node_store_t*       const store,
1019                        const wsrep_gtid_t* const ws_gtid)
1020 {
1021     assert(store);
1022 
1023     STORE_MUTEX_LOCK(&store->gtid_mtx);
1024 
1025     store_update_gtid(store, ws_gtid);
1026 
1027     pthread_mutex_unlock(&store->gtid_mtx);
1028 }
1029 
1030 long
node_store_read_view_failures(node_store_t * const store)1031 node_store_read_view_failures(node_store_t* const store)
1032 {
1033     assert(store);
1034 
1035     long ret;
1036 
1037     STORE_MUTEX_LOCK(&store->gtid_mtx);
1038 
1039     ret = store->read_view_fails;;
1040 
1041     pthread_mutex_unlock(&store->gtid_mtx);
1042 
1043     return ret;
1044 }
1045