1 /* Copyright (c) 2019-2020, Codership Oy. All rights reserved.
2 *
3 * This program is free software; you can redistribute it and/or modify
4 * it under the terms of the GNU General Public License as published by
5 * the Free Software Foundation; version 2 of the License.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
15 */
16
17 #include "store.h"
18
19 #include "log.h"
20
21 #include <assert.h>
22 #include <errno.h>
23 #include <pthread.h>
24 #include <stdbool.h>
25 #include <stddef.h> // ptrdiff_t
26 #include <stdint.h> // uintptr_t
27 #include <stdlib.h> // abort()
28 #include <string.h> // memset()
29
30 #define DECLARE_SERIALIZE_INT(INTTYPE) \
31 static inline size_t \
32 store_serialize_##INTTYPE(void* const to, INTTYPE##_t const from) \
33 { \
34 memcpy(to, &from, sizeof(from)); /* for simplicity ignore endianness */ \
35 return sizeof(from); \
36 }
37
38 DECLARE_SERIALIZE_INT(uint32);
39 DECLARE_SERIALIZE_INT(int64);
40
41 #define DECLARE_DESERIALIZE_INT(INTTYPE) \
42 static inline size_t \
43 store_deserialize_##INTTYPE(INTTYPE##_t* const to, const void* const from) \
44 { \
45 memcpy(to, from, sizeof(*to)); /* for simplicity ignore endianness */ \
46 return sizeof(*to); \
47 }
48
49 DECLARE_DESERIALIZE_INT(uint32);
50 DECLARE_DESERIALIZE_INT(int64);
51
52 typedef struct record
53 {
54 wsrep_seqno_t version;
55 uint32_t value;
56 /* this order ensures that there is no padding between the members */
57 }
58 record_t;
59
60 #define STORE_RECORD_SIZE \
61 (sizeof(((record_t*)(NULL))->version) + sizeof(((record_t*)(NULL))->value))
62
63 static inline size_t
store_record_set(void * const base,size_t const index,const record_t * const record)64 store_record_set(void* const base,
65 size_t const index,
66 const record_t* const record)
67 {
68 char* const position = (char*)base + index*STORE_RECORD_SIZE;
69 memcpy(position, record, STORE_RECORD_SIZE);
70 return STORE_RECORD_SIZE;
71 }
72
73 static inline size_t
store_record_get(const void * const base,size_t const index,record_t * const record)74 store_record_get(const void* const base,
75 size_t const index,
76 record_t* const record)
77 {
78 const char* const position = (const char*)base + index*STORE_RECORD_SIZE;
79 memcpy(record, position, STORE_RECORD_SIZE);
80 return STORE_RECORD_SIZE;
81 }
82
83 static inline bool
store_record_equal(const record_t * const lhs,const record_t * const rhs)84 store_record_equal(const record_t* const lhs, const record_t* const rhs)
85 {
86 return (lhs->version == rhs->version) && (lhs->value == rhs->value);
87 }
88
89 /* transaction context */
90 struct store_trx_op
91 {
92 /* Normally what we'd need for transaction context is the record index and
93 * new record value. Here we also save read view snapshot (rec_from & rec_to)
94 * to
95 * 1. test provider certification correctness if provider supports read view
96 * 2. if not, detect conflicts at a store level. */
97 record_t rec_from;
98 record_t rec_to;
99 uint32_t idx_from;
100 uint32_t idx_to;
101 uint32_t new_value;
102 uint32_t size; /* nominal "size" of operation to manipulate on-the-wire
103 * writeset size. */
104 };
105
106 #define STORE_OP_SIZE (STORE_RECORD_SIZE + STORE_RECORD_SIZE + \
107 sizeof(((struct store_trx_op*)NULL)->idx_from) + \
108 sizeof(((struct store_trx_op*)NULL)->idx_to) + \
109 sizeof(((struct store_trx_op*)NULL)->new_value) + \
110 sizeof(((struct store_trx_op*)NULL)->size))
111
112 struct store_trx_ctx
113 {
114 wsrep_gtid_t rv_gtid;
115 size_t ops_num;
116 struct store_trx_op* ops;
117 };
118
119 static inline bool
store_trx_add_op(struct store_trx_ctx * const trx)120 store_trx_add_op(struct store_trx_ctx* const trx)
121 {
122 struct store_trx_op* const new_ops =
123 realloc(trx->ops, sizeof(struct store_trx_op)*(trx->ops_num + 1));
124
125 if (new_ops)
126 {
127 trx->ops = new_ops;
128 #ifndef NDEBUG
129 memset(&trx->ops[trx->ops_num], 0, sizeof(*trx->ops));
130 #endif
131 trx->ops_num++;
132 }
133
134 return (NULL == new_ops);
135 }
136
137 struct store_trx_entry
138 {
139 bool used;
140 struct store_trx_ctx ctx;
141 };
142
143 typedef wsrep_uuid_t member_t;
144
145 struct node_store
146 {
147 wsrep_gtid_t gtid;
148 pthread_mutex_t gtid_mtx;
149 wsrep_trx_id_t trx_id;
150 pthread_mutex_t trx_id_mtx;
151 char* snapshot;
152 member_t* members;
153 void* records;
154 size_t op_size;
155 long read_view_fails;
156 uint32_t members_num;
157 uint32_t records_num;
158 uint32_t entries_mask;
159 bool read_view_support; // read view support by cluster
160 /* trx pool piggybacked */
161 };
162
163 node_store_t*
node_store_open(const struct node_options * const opts)164 node_store_open(const struct node_options* const opts)
165 {
166 /* make the size of trx pool the next highest power of 2 over the total
167 * number of workers */
168 uint32_t trx_pool_mask = (uint32_t)(opts->masters + opts->slaves);
169 if (trx_pool_mask > 0)
170 {
171 trx_pool_mask -= 1;
172 trx_pool_mask |= trx_pool_mask >> 1;
173 trx_pool_mask |= trx_pool_mask >> 2;
174 trx_pool_mask |= trx_pool_mask >> 4;
175 trx_pool_mask |= trx_pool_mask >> 8;
176 trx_pool_mask |= trx_pool_mask >> 16;
177 }
178 assert(((trx_pool_mask + 1) & trx_pool_mask) == 0); // 2^n - 1
179
180 size_t const desired_op_size = (size_t)(opts->ws_size/opts->operations);
181 size_t const op_size = (desired_op_size > STORE_OP_SIZE ?
182 desired_op_size : STORE_OP_SIZE);
183
184 /* since the number of workers will never change, we can allocate trx pool
185 * together with the main store struc */
186 size_t const store_alloc_size = sizeof(struct node_store) +
187 /* op_size - additional buffer for op serialization per trx */
188 (sizeof(struct store_trx_entry) + op_size)*(trx_pool_mask + 1);
189
190 struct node_store* const ret = malloc(store_alloc_size);
191
192 if (ret)
193 {
194 memset(ret, 0, store_alloc_size);
195 ret->records = malloc((size_t)opts->records * STORE_RECORD_SIZE);
196
197 if (ret->records)
198 {
199 ret->gtid = WSREP_GTID_UNDEFINED;
200 pthread_mutex_init(&ret->gtid_mtx, NULL);
201 pthread_mutex_init(&ret->trx_id_mtx, NULL);
202 ret->op_size = op_size;
203 ret->records_num = (uint32_t)opts->records;
204 ret->entries_mask = trx_pool_mask;
205
206 uint32_t i;
207 for (i = 0; i < ret->records_num; i++)
208 {
209 /* keep state in serialized form for easy snapshotting */
210 struct record const record = { WSREP_SEQNO_UNDEFINED, i };
211 store_record_set(ret->records, i, &record);
212 }
213
214 return ret;
215 }
216 else
217 {
218 free(ret);
219 }
220 }
221
222 return NULL;
223 }
224
225 void
node_store_close(struct node_store * const store)226 node_store_close(struct node_store* const store)
227 {
228 assert(store);
229 assert(store->records);
230 pthread_mutex_destroy(&store->gtid_mtx);
231 pthread_mutex_destroy(&store->trx_id_mtx);
232 free(store->records);
233 free(store->members);
234 free(store);
235 }
236
237 #define STORE_MUTEX_LOCK(mtx) \
238 { \
239 int err = pthread_mutex_lock(mtx); \
240 if (err) \
241 { \
242 NODE_FATAL("Failed to lock " #mtx ": %d (%s)", \
243 err, strerror(err)); \
244 abort(); \
245 } \
246 }
247
248 static inline struct store_trx_entry*
store_get_trx_entry(struct node_store * const store,wsrep_trx_id_t const trx_id)249 store_get_trx_entry(struct node_store* const store, wsrep_trx_id_t const trx_id)
250 {
251 return (struct store_trx_entry*)
252 ((char*)(store + 1) + (trx_id & store->entries_mask)*
253 (sizeof(struct store_trx_entry) + store->op_size));
254 }
255
256 static inline struct store_trx_ctx*
store_get_trx_ctx(struct node_store * const store,wsrep_trx_id_t const trx_id)257 store_get_trx_ctx(struct node_store* const store, wsrep_trx_id_t const trx_id)
258 {
259 return &(store_get_trx_entry(store, trx_id)->ctx);
260 }
261
262 static inline wsrep_trx_id_t
store_new_trx_id(struct node_store * const store)263 store_new_trx_id(struct node_store* const store)
264 {
265 wsrep_trx_id_t ret;
266 struct store_trx_entry* trx;
267
268 STORE_MUTEX_LOCK(&store->trx_id_mtx);
269
270 do
271 {
272 store->trx_id++;
273 trx = store_get_trx_entry(store, store->trx_id);
274 }
275 while (trx->used);
276 trx->used = true;
277 ret = store->trx_id;
278
279 pthread_mutex_unlock(&store->trx_id_mtx);
280
281 memset(&trx->ctx, 0, sizeof(trx->ctx));
282
283 return ret;
284 }
285
286 static inline void
store_free_trx_id(struct node_store * const store,wsrep_trx_id_t const trx_id)287 store_free_trx_id(struct node_store* const store, wsrep_trx_id_t const trx_id)
288 {
289 struct store_trx_entry* const trx = store_get_trx_entry(store, trx_id);
290 assert(trx->used);
291 free(trx->ctx.ops);
292
293 STORE_MUTEX_LOCK(&store->trx_id_mtx);
294
295 trx->used = false;
296
297 pthread_mutex_unlock(&store->trx_id_mtx);
298 }
299
300 /**
301 * deserializes membership from snapshot */
302 static int
store_new_members(const char * ptr,const char * const endptr,uint32_t * const num,member_t ** const memb)303 store_new_members(const char* ptr, const char* const endptr,
304 uint32_t* const num, member_t** const memb)
305 {
306 ptr += store_deserialize_uint32(num, ptr);
307
308 if (*num < 2)
309 {
310 NODE_ERROR("Bogus number of members %u", *num);
311 return -1;
312 }
313
314 int ret = (int)sizeof(*num);
315
316 size_t const msize = sizeof(member_t) * *num;
317 if ((endptr - ptr) < (ptrdiff_t)msize)
318 {
319 NODE_ERROR("State snapshot does not contain all membership: "
320 "%zd < %zu", endptr - ptr, msize);
321 return -1;
322 }
323
324 *memb = calloc(*num, sizeof(member_t));
325 if (!*memb)
326 {
327 NODE_ERROR("Could not allocate new membership");
328 return -ENOMEM;
329 }
330
331 memcpy(*memb, ptr, msize);
332
333 return ret + (int)msize;
334 }
335
336 /**
337 * deserializes records from snapshot */
338 static int
store_new_records(const char * ptr,const char * const endptr,uint32_t * const num,void ** const rec)339 store_new_records(const char* ptr, const char* const endptr,
340 uint32_t* const num, void** const rec)
341 {
342 ptr += store_deserialize_uint32(num, ptr);
343
344 int ret = (int)sizeof(*num);
345 if (!*num)
346 {
347 *rec = NULL;
348 return ret;
349 }
350
351 size_t const rsize = STORE_RECORD_SIZE * *num;
352 if ((endptr - ptr) < (ptrdiff_t)rsize)
353 {
354 NODE_ERROR("State snapshot does not contain all records: "
355 "%zu < %zu", endptr - ptr, rsize);
356 return -1;
357 }
358
359 *rec = malloc(rsize);
360 if (!*rec)
361 {
362 NODE_ERROR("Could not allocate new records");
363 return -ENOMEM;
364 }
365
366 memcpy(*rec, ptr, rsize);
367
368 return ret + (int)rsize;
369 }
370
371 int
node_store_init_state(struct node_store * const store,const void * const state,size_t const state_len)372 node_store_init_state(struct node_store* const store,
373 const void* const state,
374 size_t const state_len)
375 {
376 /* First, deserialize and prepare new state */
377 if (state_len <= sizeof(member_t)*2 /* at least two members */ +
378 WSREP_UUID_STR_LEN + 1 /* : */ + 1 /* seqno */ + 1 /* \0 */)
379 {
380 NODE_ERROR("State snapshot too short: %zu", state_len);
381 return -1;
382 }
383
384 wsrep_gtid_t state_gtid;
385 int ret;
386 ret = wsrep_gtid_scan(state, state_len, &state_gtid);
387 if (ret < 0)
388 {
389 char state_str[WSREP_GTID_STR_LEN + 1] = { 0, };
390 memcpy(state_str, state, sizeof(state_str) - 1);
391 NODE_ERROR("Could not find valid GTID in the received data: %s",
392 state_str);
393 return -1;
394 }
395
396 ret++; /* \0 */
397 if ((state_len - (size_t)ret) < sizeof(uint32_t))
398 {
399 NODE_ERROR("State snapshot does not contain the number of members");
400 return -1;
401 }
402
403 const char* ptr = ((char*)state);
404 const char* const endptr = ptr + state_len;
405 ptr += ret;
406
407 uint32_t m_num;
408 member_t* new_members;
409 ret = store_new_members(ptr, endptr, &m_num, &new_members);
410 if (ret < 0)
411 {
412 return ret;
413 }
414 ptr += ret;
415
416 bool const read_view_support = ptr[0];
417 ptr += 1;
418
419 uint32_t r_num;
420 void* new_records;
421 ret = store_new_records(ptr, endptr, &r_num, &new_records);
422 if (ret < 0)
423 {
424 free(new_members);
425 return ret;
426 }
427 ptr += ret;
428
429 STORE_MUTEX_LOCK(&store->gtid_mtx);
430
431 /* just a sanity check */
432 if (0 == wsrep_uuid_compare(&state_gtid.uuid, &store->gtid.uuid) &&
433 state_gtid.seqno < store->gtid.seqno)
434 {
435 NODE_ERROR("Received snapshot that is in the past: my seqno %lld,"
436 " received seqno: %lld",
437 (long long)store->gtid.seqno, (long long)state_gtid.seqno);
438 free(new_members);
439 free(new_records);
440 ret = -1;
441 }
442 else
443 {
444 free(store->members);
445 store->members_num = m_num;
446 store->members = new_members;
447 free(store->records);
448 store->records_num = r_num;
449 store->records = new_records;
450 store->gtid = state_gtid;
451 store->read_view_support = read_view_support;
452 ret = 0;
453 }
454
455 pthread_mutex_unlock(&store->gtid_mtx);
456
457 return ret;
458 }
459
460 int
node_store_acquire_state(node_store_t * const store,const void ** const state,size_t * const state_len)461 node_store_acquire_state(node_store_t* const store,
462 const void** const state,
463 size_t* const state_len)
464 {
465 int ret = 0;
466
467 STORE_MUTEX_LOCK(&store->gtid_mtx);
468
469 if (!store->snapshot)
470 {
471 size_t const memb_len = store->members_num * sizeof(member_t);
472 size_t const rec_len = store->records_num * STORE_RECORD_SIZE;
473 size_t const buf_len = WSREP_GTID_STR_LEN + 1
474 + sizeof(uint32_t) + memb_len
475 + 1 /* read view support */
476 + sizeof(uint32_t) + rec_len;
477
478 store->snapshot = malloc(buf_len);
479
480 if (store->snapshot)
481 {
482 char* ptr = store->snapshot;
483
484 /* state GTID */
485 ret = wsrep_gtid_print(&store->gtid, ptr, buf_len);
486 if (ret > 0)
487 {
488 NODE_INFO("");
489 assert((size_t)ret < buf_len);
490
491 ptr[ret] = '\0';
492 ret++;
493 ptr += ret;
494 assert((size_t)ret < buf_len);
495
496 /* membership */
497 ptr += store_serialize_uint32(ptr, store->members_num);
498 ret += (int)sizeof(uint32_t);
499 assert((size_t)ret + memb_len < buf_len);
500 memcpy(ptr, store->members, memb_len);
501 ptr += memb_len;
502 ret += (int)memb_len;
503 assert((size_t)ret + sizeof(uint32_t) <= buf_len);
504
505 /* read view support */
506 ptr[0] = store->read_view_support;
507 ptr += 1;
508 ret += 1;
509
510 /* records */
511 ptr += store_serialize_uint32(ptr, store->records_num);
512 ret += (int)sizeof(uint32_t);
513 assert((size_t)ret + rec_len < buf_len);
514 memcpy(ptr, store->records, rec_len);
515 ret += (int)rec_len;
516 assert((size_t)ret <= buf_len);
517 }
518 else
519 {
520 NODE_ERROR("Failed to record GTID: %d (%s)", ret,strerror(-ret));
521 free(store->snapshot);
522 store->snapshot = 0;
523 }
524 }
525 else
526 {
527 NODE_ERROR("Failed to allocate snapshot buffer of size %zu",buf_len);
528 ret = -ENOMEM;
529 }
530 }
531 else
532 {
533 assert(0); /* provider should prevent such situation */
534 ret = -EAGAIN;
535 }
536
537 pthread_mutex_unlock(&store->gtid_mtx);
538
539 if (ret > 0)
540 {
541 NODE_INFO("\n\nPrepared snapshot of %u records\n\n", store->records_num);
542 *state = store->snapshot;
543 *state_len = (size_t)ret;
544 ret = 0;
545 }
546
547 return ret;
548 }
549
550 void
node_store_release_state(node_store_t * const store)551 node_store_release_state(node_store_t* const store)
552 {
553 STORE_MUTEX_LOCK(&store->gtid_mtx);
554
555 assert(store->snapshot);
556 free(store->snapshot);
557 store->snapshot = 0;
558
559 pthread_mutex_unlock(&store->gtid_mtx);
560 }
561
562 int
node_store_update_membership(struct node_store * const store,const wsrep_view_info_t * const v)563 node_store_update_membership(struct node_store* const store,
564 const wsrep_view_info_t* const v)
565 {
566 assert(store);
567 assert(WSREP_VIEW_PRIMARY == v->status);
568 assert(v->memb_num > 0);
569
570 STORE_MUTEX_LOCK(&store->gtid_mtx);
571
572 bool const continuation = v->state_id.seqno == store->gtid.seqno + 1 &&
573 0 == wsrep_uuid_compare(&v->state_id.uuid, &store->gtid.uuid);
574
575 bool const initialization = WSREP_SEQNO_UNDEFINED == store->gtid.seqno &&
576 0 == wsrep_uuid_compare(&WSREP_UUID_UNDEFINED, &store->gtid.uuid);
577
578 if (!(continuation || initialization))
579 {
580 char store_str[WSREP_GTID_STR_LEN + 1] = { 0, };
581 wsrep_gtid_print(&store->gtid, store_str, sizeof(store_str));
582 char view_str[WSREP_GTID_STR_LEN + 1] = { 0, };
583 wsrep_gtid_print(&v->state_id, view_str, sizeof(view_str));
584
585 NODE_FATAL("Attempt to initialize store GTID from incompatible view:\n"
586 "\tstore: %s\n"
587 "\tview: %s",
588 store_str, view_str);
589 abort();
590 }
591
592 wsrep_uuid_t* const new_members = calloc(sizeof(wsrep_uuid_t),
593 (size_t)v->memb_num);
594 if (!new_members)
595 {
596 NODE_FATAL("Could not allocate new members array");
597 abort();
598 }
599
600 int i;
601 for (i = 0; i < v->memb_num; i++)
602 {
603 new_members[i] = v->members[i].id;
604 }
605
606 /* REPLICATION: at this point we should compare old and new memberships and
607 * rollback all streaming transactions from the partitioned
608 * members, if any. But we don't support it in this program yet.
609 */
610
611 free(store->members);
612
613 store->members = new_members;
614 store->members_num = (uint32_t)v->memb_num;
615 store->gtid = v->state_id;
616 store->read_view_support = (v->capabilities & WSREP_CAP_SNAPSHOT);
617
618 pthread_mutex_unlock(&store->gtid_mtx);
619
620 return 0;
621 }
622
623 void
node_store_gtid(struct node_store * const store,wsrep_gtid_t * const gtid)624 node_store_gtid(struct node_store* const store,
625 wsrep_gtid_t* const gtid)
626 {
627 assert(store);
628
629 STORE_MUTEX_LOCK(&store->gtid_mtx);
630
631 *gtid = store->gtid;
632
633 pthread_mutex_unlock(&store->gtid_mtx);
634 }
635
636
637 static inline void
store_serialize_op(void * const buf,const struct store_trx_op * const op)638 store_serialize_op(void* const buf, const struct store_trx_op* const op)
639 {
640 char* ptr = buf;
641 ptr += store_record_set(ptr, 0, &op->rec_from);
642 ptr += store_record_set(ptr, 0, &op->rec_to);
643 ptr += store_serialize_uint32(ptr, op->idx_from);
644 ptr += store_serialize_uint32(ptr, op->idx_to);
645 ptr += store_serialize_uint32(ptr, op->new_value);
646 store_serialize_uint32(ptr, op->size);
647 }
648
649 static inline void
store_deserialize_op(struct store_trx_op * const op,const void * const buf)650 store_deserialize_op(struct store_trx_op* const op, const void* const buf)
651 {
652 const char* ptr = buf;
653 ptr += store_record_get(ptr, 0, &op->rec_from);
654 ptr += store_record_get(ptr, 0, &op->rec_to);
655 ptr += store_deserialize_uint32(&op->idx_from, ptr);
656 ptr += store_deserialize_uint32(&op->idx_to, ptr);
657 ptr += store_deserialize_uint32(&op->new_value, ptr);
658 store_deserialize_uint32(&op->size, ptr);
659 }
660
661 static inline void
store_serialize_gtid(void * const buf,const wsrep_gtid_t * const gtid)662 store_serialize_gtid(void* const buf, const wsrep_gtid_t* const gtid)
663 {
664 char* ptr = buf;
665 memcpy(ptr, >id->uuid, sizeof(gtid->uuid));
666 ptr += sizeof(gtid->uuid);
667 store_serialize_int64(ptr, gtid->seqno);
668 }
669
670 static inline void
store_deserialize_gtid(wsrep_gtid_t * const gtid,const void * const buf)671 store_deserialize_gtid(wsrep_gtid_t* const gtid, const void* const buf)
672 {
673 const char* ptr = buf;
674 memcpy(>id->uuid, ptr, sizeof(gtid->uuid));
675 ptr += sizeof(gtid->uuid);
676 store_deserialize_int64(>id->seqno, ptr);
677 }
678
679 #define STORE_GTID_SIZE (sizeof(((wsrep_gtid_t*)(NULL))->uuid) + sizeof(int64_t))
680
681 int
node_store_execute(node_store_t * const store,wsrep_t * const wsrep,wsrep_ws_handle_t * const ws_handle)682 node_store_execute(node_store_t* const store,
683 wsrep_t* const wsrep,
684 wsrep_ws_handle_t* const ws_handle)
685 {
686 assert(store);
687
688 if (0 == ws_handle->trx_id)
689 {
690 assert(sizeof(ws_handle->trx_id) >= sizeof(uintptr_t));
691 ws_handle->trx_id = store_new_trx_id(store);
692 }
693
694 struct store_trx_ctx* trx = store_get_trx_ctx(store, ws_handle->trx_id);
695 if (store_trx_add_op(trx)) return -ENOMEM;
696 struct store_trx_op* const op = &trx->ops[trx->ops_num - 1];
697
698 STORE_MUTEX_LOCK(&store->gtid_mtx);
699
700 if (1 == trx->ops_num)
701 {
702 /* First operation, save ID of the read view of the transaction */
703 trx->rv_gtid = store->gtid;
704 }
705
706 /* Transaction op: copy value from one random record to another... */
707 op->idx_from = (uint32_t)rand() % store->records_num;
708 op->idx_to = (uint32_t)rand() % store->records_num;
709 store_record_get(store->records, op->idx_from, &op->rec_from);
710 store_record_get(store->records, op->idx_to, &op->rec_to);
711
712 pthread_mutex_unlock(&store->gtid_mtx);
713
714 wsrep_status_t ret = WSREP_TRX_FAIL;
715
716 if (op->rec_from.version > trx->rv_gtid.seqno ||
717 op->rec_to.version > trx->rv_gtid.seqno)
718 {
719 /* transaction read view changed, trx needs to be restarted */
720 #if 0
721 NODE_INFO("Transaction read view changed: %lld -> %lld, returning %d",
722 (long long)trx->rv_gtid.seqno,
723 (long long)(op->rec_from.version > op->rec_to.version ?
724 op->rec_from.version : op->rec_to.version),
725 ret);
726 #endif
727 goto error;
728 }
729
730 /* Transaction op: ... and modify it somehow, e.g. increment by 1 */
731 op->new_value = op->rec_from.value + 1;
732
733 if (1 == trx->ops_num) // first trx operation
734 {
735 /* REPLICATION: Since this application does not implement record locks,
736 * it needs to establish read view for each transaction for
737 * a proper conflict detection and transaction isolation.
738 * Otherwose we'll need to implement record versioning */
739 if (store->read_view_support)
740 {
741 ret = wsrep->assign_read_view(wsrep, ws_handle, &trx->rv_gtid);
742 if (ret)
743 {
744 NODE_ERROR("wsrep::assign_read_view(%lld) failed: %d",
745 trx->rv_gtid.seqno, ret);
746 goto error;
747 }
748 }
749
750 /* Record read view in the writeset for debugging purposes */
751 assert(store->op_size > STORE_GTID_SIZE);
752 store_serialize_gtid(trx + 1, &trx->rv_gtid);
753 wsrep_buf_t ws = { .ptr = trx + 1, .len = STORE_GTID_SIZE };
754 ret = wsrep->append_data(wsrep, ws_handle, &ws, 1, WSREP_DATA_ORDERED,
755 true);
756 if (ret)
757 {
758 NODE_ERROR("wsrep::append_data(rv_gtid) failed: %d", ret);
759 goto error;
760 }
761 }
762
763 /* REPLICATION: append keys touched by the operation
764 *
765 * NOTE: depending on data access granularity some applications may require
766 * multipart keys, e.g. <schema>:<table>:<row> in a SQL database.
767 * Single part keys match hashtables and key-value stores.
768 * Below we have two different single-part keys which reference two
769 * different records. */
770 uint32_t key_val;
771 wsrep_buf_t key_part = { .ptr = &key_val, .len = sizeof(key_val) };
772 wsrep_key_t ws_key = { .key_parts = &key_part, .key_parts_num = 1 };
773
774 /* REPLICATION: Key 1 - the key of the source, unchanged record */
775 store_serialize_uint32(&key_val, op->idx_from);
776 ret = wsrep->append_key(wsrep, ws_handle,
777 &ws_key,
778 1, /* single key */
779 WSREP_KEY_REFERENCE,
780 true /* provider shall make a copy of the key */);
781 if (ret)
782 {
783 NODE_ERROR("wsrep::append_key(REFERENCE) failed: %d", ret);
784 goto error;
785 }
786
787 /* REPLICATION: Key 2 - the key of the record we want to update */
788 store_serialize_uint32(&key_val, op->idx_to);
789 ret = wsrep->append_key(wsrep, ws_handle,
790 &ws_key,
791 1, /* single key */
792 WSREP_KEY_UPDATE,
793 true /* provider shall make a copy of the key */);
794 if (ret)
795 {
796 NODE_ERROR("wsrep::append_key(UPDATE) failed: %d", ret);
797 goto error;
798 }
799
800 /* REPLICATION: append transaction operation to the "writeset"
801 * (WS buffer was allocated together with trx context above) */
802 assert(store->op_size >= STORE_OP_SIZE);
803 assert(store->op_size == (uint32_t)store->op_size);
804 op->size = (uint32_t)store->op_size;
805 store_serialize_op(trx + 1, op);
806 wsrep_buf_t ws = { .ptr = trx + 1, .len = store->op_size };
807 ret = wsrep->append_data(wsrep, ws_handle, &ws, 1, WSREP_DATA_ORDERED, true);
808
809 if (!ret) return 0;
810
811 NODE_ERROR("wsrep::append_data(op) failed: %d", ret);
812
813 error:
814 store_free_trx_id(store, ws_handle->trx_id);
815
816 return ret;
817 }
818
819 int
node_store_apply(node_store_t * const store,wsrep_trx_id_t * const trx_id,const wsrep_buf_t * const ws)820 node_store_apply(node_store_t* const store,
821 wsrep_trx_id_t* const trx_id,
822 const wsrep_buf_t* const ws)
823 {
824 assert(store);
825 (void)store;
826
827 *trx_id = store_new_trx_id(store);
828 struct store_trx_ctx* const trx = store_get_trx_ctx(store, *trx_id);
829
830 /* prepare trx context for commit */
831 const char* ptr = ws->ptr;
832 size_t left = ws->len;
833
834 /* at least one operation should be there */
835 assert(left >= STORE_GTID_SIZE + STORE_OP_SIZE);
836
837 if (left >= STORE_GTID_SIZE)
838 {
839 store_deserialize_gtid(&trx->rv_gtid, ptr);
840 left -= STORE_GTID_SIZE;
841 ptr += STORE_GTID_SIZE;
842 }
843
844 while (left >= STORE_OP_SIZE)
845 {
846 if (store_trx_add_op(trx))
847 {
848 store_free_trx_id(store,*trx_id); /* "rollback": release resources */
849 return -ENOMEM;
850 }
851 struct store_trx_op* const op = &trx->ops[trx->ops_num - 1];
852
853 store_deserialize_op(op, ptr);
854 assert(op->idx_to <= store->records_num);
855
856 left -= op->size;
857 ptr += op->size;
858 }
859
860 if (left != 0)
861 {
862 NODE_FATAL("Failed to process last (%d/%zu) bytes of the writeset.",
863 (int)left, ws->len);
864 abort();
865 }
866
867 return 0;
868 }
869
870 static uint32_t const store_fnv32_seed = 2166136261;
871
872 static inline uint32_t
store_fnv32a(const void * buf,size_t const len,uint32_t seed)873 store_fnv32a(const void* buf, size_t const len, uint32_t seed)
874 {
875 static uint32_t const fnv32_prime = 16777619;
876 const uint8_t* bp = (const uint8_t*)buf;
877 const uint8_t* const be = bp + len;
878
879 while (bp < be)
880 {
881 seed ^= *bp++;
882 seed *= fnv32_prime;
883 }
884
885 return seed;
886 }
887
888
889 static void
store_checksum_state(node_store_t * store)890 store_checksum_state(node_store_t* store)
891 {
892 uint32_t res = store_fnv32_seed;
893 uint32_t i;
894
895 for (i = 0; i < store->members_num; i++)
896 {
897 res = store_fnv32a(&store->members[i], sizeof(*store->members), res);
898 }
899
900 res = store_fnv32a(store->records, store->records_num * STORE_RECORD_SIZE,
901 res);
902
903 res = store_fnv32a(&store->gtid.uuid, sizeof(store->gtid.uuid), res);
904
905 wsrep_seqno_t s;
906 store_serialize_int64(&s, store->gtid.seqno);
907 res = store_fnv32a(&s, sizeof(s), res);
908
909 NODE_INFO("\n\n\tSeqno: %lld; state hash: %#010x\n",
910 (long long)store->gtid.seqno, res);
911 }
912
913 static inline void
store_update_gtid(node_store_t * const store,const wsrep_gtid_t * ws_gtid)914 store_update_gtid(node_store_t* const store, const wsrep_gtid_t* ws_gtid)
915 {
916 assert(0 == wsrep_uuid_compare(&store->gtid.uuid, &ws_gtid->uuid));
917
918 store->gtid.seqno++;
919
920 if (store->gtid.seqno != ws_gtid->seqno)
921 {
922 NODE_FATAL("Out of order commit: expected %lld, got %lld",
923 store->gtid.seqno, ws_gtid->seqno);
924 abort();
925 }
926
927 static wsrep_seqno_t const period = 0x000fffff; /* ~1M */
928 if (0 == (store->gtid.seqno & period))
929 {
930 store_checksum_state(store);
931 }
932 }
933
934 void
node_store_commit(node_store_t * const store,wsrep_trx_id_t const trx_id,const wsrep_gtid_t * const ws_gtid)935 node_store_commit(node_store_t* const store,
936 wsrep_trx_id_t const trx_id,
937 const wsrep_gtid_t* const ws_gtid)
938 {
939 assert(store);
940 assert(trx_id);
941
942 struct store_trx_ctx* const trx = store_get_trx_ctx(store, trx_id);
943
944 bool const check_read_view_snapshot =
945 #ifdef NDEBUG
946 !store->read_view_support;
947 #else
948 1;
949 #endif /* NDEBUG */
950
951 STORE_MUTEX_LOCK(&store->gtid_mtx);
952
953 store_update_gtid(store, ws_gtid);
954
955 /* First loop is to check if we can commit all operations if provider
956 * does not support read view or for debugging puposes */
957 size_t i;
958 if (check_read_view_snapshot)
959 {
960 for (i = 0; i < trx->ops_num; i++)
961 {
962 struct store_trx_op* const op = &trx->ops[i];
963
964 record_t from, to;
965 store_record_get(store->records, op->idx_from, &from);
966 store_record_get(store->records, op->idx_to, &to);
967
968 if (!store_record_equal(&op->rec_from, &from) ||
969 !store_record_equal(&op->rec_to, &to))
970 {
971 /* read view changed since transaction was executed,
972 * can't commit */
973 assert(op->rec_from.version <= from.version);
974 assert(op->rec_to.version <= to.version);
975 if (op->rec_from.version == from.version)
976 assert(op->rec_from.value == from.value);
977 if (op->rec_to.version == to.version)
978 assert(op->rec_to.value == to.value);
979 if (store->read_view_support) abort();
980
981 store->read_view_fails++;
982
983 NODE_INFO("Read view changed at commit time, rollback trx");
984
985 goto error;
986 }
987 }
988 }
989
990 /* Second loop is to actually modify the dataset */
991 for (i = 0; i < trx->ops_num; i++)
992 {
993 struct store_trx_op* const op = &trx->ops[i];
994
995 record_t const new_record =
996 { .version = ws_gtid->seqno, .value = op->new_value };
997
998 store_record_set(store->records, op->idx_to, &new_record);
999 }
1000
1001 error:
1002 pthread_mutex_unlock(&store->gtid_mtx);
1003
1004 store_free_trx_id(store, trx_id);
1005 }
1006
1007 void
node_store_rollback(node_store_t * const store,wsrep_trx_id_t const trx_id)1008 node_store_rollback(node_store_t* const store,
1009 wsrep_trx_id_t const trx_id)
1010 {
1011 assert(store);
1012 assert(trx_id);
1013
1014 store_free_trx_id(store, trx_id);
1015 }
1016
1017 void
node_store_update_gtid(node_store_t * const store,const wsrep_gtid_t * const ws_gtid)1018 node_store_update_gtid(node_store_t* const store,
1019 const wsrep_gtid_t* const ws_gtid)
1020 {
1021 assert(store);
1022
1023 STORE_MUTEX_LOCK(&store->gtid_mtx);
1024
1025 store_update_gtid(store, ws_gtid);
1026
1027 pthread_mutex_unlock(&store->gtid_mtx);
1028 }
1029
1030 long
node_store_read_view_failures(node_store_t * const store)1031 node_store_read_view_failures(node_store_t* const store)
1032 {
1033 assert(store);
1034
1035 long ret;
1036
1037 STORE_MUTEX_LOCK(&store->gtid_mtx);
1038
1039 ret = store->read_view_fails;;
1040
1041 pthread_mutex_unlock(&store->gtid_mtx);
1042
1043 return ret;
1044 }
1045