1 /* Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22
23 #include <rpc/rpc.h>
24 #include <assert.h>
25
26 #include <stdlib.h>
27
28 #include "xcom_common.h"
29 #include "simset.h"
30 #include "xcom_vp.h"
31 #include "xcom_cache.h"
32 #include "task.h"
33 #include "node_no.h"
34 #include "server_struct.h"
35 #include "xcom_detector.h"
36 #include "site_struct.h"
37 #include "xcom_transport.h"
38 #include "xcom_base.h"
39 #include "pax_msg.h"
40 #include "xcom_vp_str.h"
41 #include "synode_no.h"
42 #include "task.h"
43 #include "task_debug.h"
44 #include "site_def.h"
45 #include "bitset.h"
46 #include "app_data.h"
47 #include "simset.h"
48 #include "xcom_cfg.h"
49
50 #define DBG_CACHE_SIZE 0
51
52 /* Protect at least MIN_CACHED * (number of nodes) pax_machine objects from deallocation by shrink_cache */
53 #define MIN_CACHED 10
54
55 /* {{{ Paxos machine cache */
56
57 struct lru_machine {
58 linkage lru_link;
59 pax_machine pax;
60 };
61
62 static synode_no last_removed_cache;
63
was_removed_from_cache(synode_no x)64 int was_removed_from_cache(synode_no x)
65 {
66 ADD_EVENTS(
67 add_event(string_arg("x "));
68 add_synode_event(x);
69 add_event(string_arg("last_removed_cache "));
70 add_synode_event(last_removed_cache);
71 );
72 /*
73 What to do with requests from nodes that have a different group ID?
74 Should we just ignore them, as we do with the current code,
75 or should we do something about it?
76 */
77 return last_removed_cache.group_id == x.group_id && !synode_gt(x, last_removed_cache);
78 }
79
80 #define BUCKETS (CACHED)
81
82 static linkage pax_hash[BUCKETS]; /* Hash link table */
83 static lru_machine cache[CACHED]; /* The Paxos instances, plus a link for the LRU chain */
84 static linkage protected_lru = {0,&protected_lru, &protected_lru}; /* Head of LRU chain of cache hits */
85 static linkage probation_lru = {0,&probation_lru, &probation_lru}; /* Head of LRU chain of cache misses */
86
87 static pax_machine *init_pax_machine(pax_machine *p, lru_machine *lru, synode_no synode);
88
hash_init()89 static void hash_init()
90 {
91 unsigned int i = 0;
92 for (i = 0; i < BUCKETS; i++) {
93 link_init(&pax_hash[i], type_hash("pax_machine"));
94 }
95 }
96
97 extern void hexdump(void *p, long length);
98
99 #if 0
100 #define FNVSTART 0x811c9dc5
101
102 /* Fowler-Noll-Vo type multiplicative hash */
103 static uint32_t fnv_hash(unsigned char *buf, size_t length, uint32_t sum)
104 {
105 size_t i = 0;
106 for (i = 0; i < length; i++) {
107 sum = sum * (uint32_t)0x01000193 ^ (uint32_t)buf[i];
108 }
109 return sum;
110 }
111
112 static unsigned int synode_hash(synode_no synode)
113 {
114 /* Need to hash three fields separately, since struct may contain padding with
115 undefined values */
116 return fnv_hash((unsigned char *) & synode.node, sizeof(synode.node),
117 fnv_hash((unsigned char *) & synode.group_id, sizeof(synode.group_id),
118 fnv_hash((unsigned char *) & synode.msgno, sizeof(synode.msgno), FNVSTART)))
119
120 % BUCKETS;
121 }
122 #else
synode_hash(synode_no synode)123 static unsigned int synode_hash(synode_no synode)
124 {
125 /* Need to hash three fields separately, since struct may contain padding with
126 undefined values */
127 return (unsigned int)
128 (4711 * synode.node + 5 * synode.group_id + synode.msgno) % BUCKETS;
129 }
130
131 #endif
132
hash_in(pax_machine * p)133 static pax_machine *hash_in(pax_machine *p)
134 {
135 MAY_DBG(FN; PTREXP(p);
136 SYCEXP(p->synode);
137 );
138 link_into(&p->hash_link, &pax_hash[synode_hash(p->synode)]);
139 return p;
140 }
141
hash_out(pax_machine * p)142 static pax_machine *hash_out(pax_machine *p)
143 {
144 MAY_DBG(FN; PTREXP(p);
145 SYCEXP(p->synode);
146 );
147 return (pax_machine * )link_out(&p->hash_link);
148 }
149
hash_get(synode_no synode)150 pax_machine *hash_get(synode_no synode)
151 {
152 /* static pax_machine *cached_machine = NULL; */
153 linkage * bucket = &pax_hash[synode_hash(synode)];
154
155 /* if(cached_machine && synode_eq(synode, cached_machine->synode)) */
156 /* return cached_machine; */
157
158 FWD_ITER(bucket, pax_machine,
159 if (synode_eq(link_iter->synode, synode)){
160 /* cached_machine = link_iter; */
161 return link_iter;
162 }
163 )
164 ;
165 return NULL;
166 }
167
168 #if 0
169 static int is_noop(synode_no synode)
170 {
171 if (is_cached(synode)) {
172 pax_machine * m = get_cache(synode);
173 return m->learner.msg && m->learner.msg->msg_type == no_op;
174 } else {
175 return 0;
176 }
177 }
178 #endif
179
180 /*
181 Get a machine for (re)use.
182 The machines are statically allocated, and organized in two lists.
183 probation_lru is the free list.
184 protected_lru tracks the machines that are currently in the cache in
185 lest recently used order.
186 */
lru_get()187 static lru_machine *lru_get()
188 {
189 lru_machine * retval = NULL;
190 if (!link_empty(&probation_lru)) {
191 retval = (lru_machine * ) link_first(&probation_lru);
192 } else {
193 /* Find the first non-busy instance in the LRU */
194 FWD_ITER(&protected_lru, lru_machine,
195 if (!is_busy_machine(&link_iter->pax)) {
196 retval = link_iter;
197 /* Since this machine is in in the cache, we need to update
198 last_removed_cache */
199 last_removed_cache = retval->pax.synode;
200 break;
201 }
202 )
203 }
204 assert(retval && !is_busy_machine(&retval->pax));
205 return retval;
206 }
207
lru_touch_hit(pax_machine * p)208 static lru_machine *lru_touch_hit(pax_machine *p)
209 {
210 lru_machine * lru = p->lru;
211 link_into(link_out(&lru->lru_link), &protected_lru);
212 return lru;
213 }
214
215
216 /* Initialize the message cache */
init_cache()217 void init_cache()
218 {
219 unsigned int i = 0;
220 link_init(&protected_lru, type_hash("lru_machine"));
221 link_init(&probation_lru, type_hash("lru_machine"));
222 hash_init();
223 for (i = 0; i < CACHED; i++) {
224 lru_machine * l = &cache[i];
225 link_init(&l->lru_link, type_hash("lru_machine"));
226 link_into(&l->lru_link, &probation_lru);
227 init_pax_machine(&l->pax, l, null_synode);
228 }
229 init_cache_size(); /* After cache has been intialized, size is 0 */
230 last_removed_cache = null_synode;
231 }
232
deinit_cache()233 void deinit_cache()
234 {
235 int i= 0;
236 /*
237 We reset the memory structures before claiming back memory.
238 Since deiniting the cache happens rarely - mostly when the
239 XCom thread terminates we are ok with doing it like this,
240 i.e., at the cost an additional loop and potential extra
241 allocations - before deallocating.
242
243 We do this to not clutter the execution flow and improve
244 readability and maintaintability by keeping the source code
245 for the deactivation routine simple and straightforward.
246 */
247 init_cache();
248 for (i= 0; i < CACHED; i++)
249 {
250 lru_machine *l= &cache[i];
251 pax_machine *p= &l->pax;
252 if (p->proposer.prep_nodeset)
253 {
254 free_bit_set(p->proposer.prep_nodeset);
255 p->proposer.prep_nodeset= NULL;
256 }
257 if (p->proposer.prop_nodeset)
258 {
259 free_bit_set(p->proposer.prop_nodeset);
260 p->proposer.prop_nodeset= NULL;
261 }
262 }
263 }
264
265 /* static synode_no log_tail; */
266
get_cache_no_touch(synode_no synode)267 pax_machine *get_cache_no_touch(synode_no synode)
268 {
269 pax_machine * retval = hash_get(synode);
270 /* DBGOUT(FN; SYCEXP(synode); STREXP(task_name())); */
271 MAY_DBG(FN; SYCEXP(synode); PTREXP(retval));
272 if (!retval) {
273 lru_machine * l = lru_get(); /* Need to know when it is safe to re-use... */
274 MAY_DBG(FN; PTREXP(l);
275 COPY_AND_FREE_GOUT(dbg_pax_machine(&l->pax));
276 );
277 /* assert(l->pax.synode > log_tail); */
278
279 retval = hash_out(&l->pax); /* Remove from hash table */
280 init_pax_machine(retval, l, synode); /* Initialize */
281 hash_in(retval); /* Insert in hash table again */
282 }
283 MAY_DBG(FN; SYCEXP(synode); PTREXP(retval));
284 return retval;
285 }
286
get_cache(synode_no synode)287 pax_machine *get_cache(synode_no synode)
288 {
289 pax_machine * retval = get_cache_no_touch(synode);
290 lru_touch_hit(retval); /* Insert in protected_lru */
291 MAY_DBG(FN; SYCEXP(synode); PTREXP(retval));
292 return retval;
293 }
294
can_deallocate(lru_machine * link_iter)295 static inline int can_deallocate(lru_machine *link_iter)
296 {
297 synode_no delivered_msg;
298 site_def const *site = get_site_def();
299 site_def const *dealloc_site = find_site_def(link_iter->pax.synode);
300
301 /* If we have no site, or site was just installed, refuse deallocation */
302 if(site == 0)
303 return 0;
304 /*
305 With the patch that was put in to ensure that nodes always see a
306 global view message when it joins, the node that joins may need
307 messages which are significantly behind the point where the node
308 joins (effectively starting with the latest config). So there is
309 a very real risk that a node which joined might find that those
310 messages had been removed, since all the other nodes had executed
311 past that point. This test effectively stops garbage collection
312 of old messages until the joining node has got a chance to tell
313 the others about its low water mark. If it has not done that
314 within DETECTOR_LIVE_TIMEOUT, it will be considered dead by the
315 other nodes anyway, and expelled.
316 */
317 if((site->install_time + DETECTOR_LIVE_TIMEOUT) > task_now())
318 return 0;
319 if(dealloc_site == 0)/* Synode does not match any site, OK to deallocate */
320 return 1;
321 delivered_msg = get_min_delivered_msg(site);
322 if(synode_eq(delivered_msg,null_synode)) /* Missing info from some node, not OK */
323 return 0;
324 return link_iter->pax.synode.group_id != delivered_msg.group_id ||
325 (link_iter->pax.synode.msgno + MIN_CACHED) < delivered_msg.msgno;
326 }
327
328 /*
329 Loop through the LRU (protected_lru) and deallocate objects until the size of
330 the cache is below the limit.
331 The freshly initialized objects are put into the probation_lru, so we can always start
332 scanning at the end of protected_lru.
333 lru_get will always look in probation_lru first.
334 */
shrink_cache()335 void shrink_cache()
336 {
337 FWD_ITER(&protected_lru, lru_machine,
338 if ( above_cache_limit() && can_deallocate(link_iter)) {
339 last_removed_cache = link_iter->pax.synode;
340 hash_out(&link_iter->pax); /* Remove from hash table */
341 link_into(link_out(&link_iter->lru_link), &probation_lru); /* Put in probation lru */
342 init_pax_machine(&link_iter->pax, link_iter, null_synode);
343 } else {
344 return;
345 }
346 );
347 }
348
xcom_cache_var_init()349 void xcom_cache_var_init()
350 {
351 }
352
353 /* }}} */
354
355
356 /* {{{ Paxos machine */
357
358 /* Initialize a Paxos instance */
init_pax_machine(pax_machine * p,lru_machine * lru,synode_no synode)359 static pax_machine *init_pax_machine(pax_machine *p, lru_machine *lru, synode_no synode)
360 {
361 sub_cache_size(pax_machine_size(p));
362 link_init(&p->hash_link, type_hash("pax_machine"));
363 p->lru = lru;
364 p->synode = synode;
365 p->last_modified = 0.0;
366 link_init(&p->rv, type_hash("task_env"));
367 init_ballot(&p->proposer.bal, 0, 0);
368 init_ballot(&p->proposer.sent_prop, 0, 0);
369 init_ballot(&p->proposer.sent_learn, -1, 0);
370 if (!p->proposer.prep_nodeset)
371 p->proposer.prep_nodeset = new_bit_set(NSERVERS);
372 BIT_ZERO(p->proposer.prep_nodeset);
373 if (!p->proposer.prop_nodeset)
374 p->proposer.prop_nodeset = new_bit_set(NSERVERS);
375 BIT_ZERO(p->proposer.prop_nodeset);
376 replace_pax_msg(&p->proposer.msg, NULL);
377 init_ballot(&p->acceptor.promise, 0, 0);
378 replace_pax_msg(&p->acceptor.msg, NULL);
379 replace_pax_msg(&p->learner.msg, NULL);
380 p->lock = 0;
381 p->op = initial_op;
382 p->force_delivery = 0;
383 return p;
384 }
385
lock_pax_machine(pax_machine * p)386 int lock_pax_machine(pax_machine *p)
387 {
388 int old = p->lock;
389 if (!p->lock)
390 p->lock = 1;
391 return old;
392 }
393
unlock_pax_machine(pax_machine * p)394 void unlock_pax_machine(pax_machine *p)
395 {
396 p->lock = 0;
397 }
398
is_busy_machine(pax_machine * p)399 int is_busy_machine(pax_machine *p)
400 {
401 return p->lock;
402 }
403
404 /* purecov: begin deadcode */
405 /* Debug nodesets of Paxos instance */
dbg_machine_nodeset(pax_machine * p,u_int nodes)406 char *dbg_machine_nodeset(pax_machine *p, u_int nodes)
407 {
408 GET_NEW_GOUT;
409 STRLIT("proposer.prep_nodeset ");
410 COPY_AND_FREE_GOUT(dbg_bitset(p->proposer.prep_nodeset, nodes));
411 STRLIT("proposer.prop_nodeset ");
412 COPY_AND_FREE_GOUT(dbg_bitset(p->proposer.prop_nodeset, nodes));
413 RET_GOUT;
414 }
415
416
417 /* Debug a Paxos instance */
dbg_pax_machine(pax_machine * p)418 char *dbg_pax_machine(pax_machine *p)
419 {
420 GET_NEW_GOUT;
421 if (!p) {
422 STRLIT("p == 0 ");
423 RET_GOUT;
424 }
425 PTREXP(p);
426 COPY_AND_FREE_GOUT(dbg_machine_nodeset(p, get_maxnodes(find_site_def(p->synode))));
427 BALCEXP(p->proposer.bal);
428 BALCEXP(p->proposer.sent_prop);
429 BALCEXP(p->proposer.sent_learn);
430 BALCEXP(p->acceptor.promise);
431 STRLIT("proposer.msg ");
432 COPY_AND_FREE_GOUT(dbg_pax_msg(p->proposer.msg));
433 STRLIT("acceptor.msg ");
434 COPY_AND_FREE_GOUT(dbg_pax_msg(p->acceptor.msg));
435 STRLIT("learner.msg ");
436 COPY_AND_FREE_GOUT(dbg_pax_msg(p->learner.msg));
437 NDBG(p->last_modified,f);
438 NDBG(p->lock,d);
439 STREXP(pax_op_to_str(p->op));
440 RET_GOUT;
441 }
442
443 /* purecov: end */
444
445 /*
446 Return the size of a pax_msg. Counts only the pax_msg struct itself
447 and the size of the app_data.
448 */
get_app_msg_size(pax_msg const * p)449 static inline size_t get_app_msg_size(pax_msg const *p)
450 {
451 if(!p)
452 return (size_t) 0;
453 else
454 return sizeof(pax_msg) + app_data_list_size(p->a);
455 }
456
457 /*
458 Return the size of the messages referenced by a pax_machine.
459 The pax_machine itself is statically allocated, so we do
460 not count this when computing the cache size.
461 */
pax_machine_size(pax_machine const * p)462 size_t pax_machine_size(pax_machine const *p)
463 {
464 size_t size = get_app_msg_size(p->proposer.msg);
465
466 if (p->acceptor.msg && p->proposer.msg != p->acceptor.msg)
467 size += get_app_msg_size(p->acceptor.msg);
468
469 if (p->learner.msg && p->acceptor.msg != p->learner.msg && p->proposer.msg != p->learner.msg)
470 size += get_app_msg_size(p->learner.msg);
471 return size;
472 }
473
474 static size_t cache_size = 0;
475
476 /* The cache itself is statically allocated, set size of dynamically allocted data to 0 */
init_cache_size()477 void init_cache_size()
478 {
479 cache_size = 0;
480 }
481
482 /* Add to cache size */
add_cache_size(size_t x)483 size_t add_cache_size(size_t x)
484 {
485 cache_size += x;
486 if (DBG_CACHE_SIZE && x)
487 {
488 G_DEBUG("%f %s:%d cache_size %lu x %lu", seconds(), __FILE__, __LINE__,
489 (long unsigned int)cache_size, (long unsigned int)x);
490 }
491 return cache_size;
492 }
493
494 /* Subtract from cache size */
sub_cache_size(size_t x)495 size_t sub_cache_size(size_t x)
496 {
497 cache_size -= x;
498 if (DBG_CACHE_SIZE && x)
499 {
500 G_DEBUG("%f %s:%d cache_size %lu x %lu", seconds(), __FILE__, __LINE__,
501 (long unsigned int)cache_size, (long unsigned int)x);
502 }
503 return cache_size;
504 }
505
506 /* See if cache is above limit */
above_cache_limit()507 int above_cache_limit()
508 {
509 return the_app_xcom_cfg && cache_size > the_app_xcom_cfg->cache_limit ;
510 }
511
512 /* If cfg object exits, set max cache size */
set_max_cache_size(size_t x)513 size_t set_max_cache_size(size_t x)
514 {
515 if (the_app_xcom_cfg)
516 return the_app_xcom_cfg->cache_limit = x;
517 else
518 return 0;
519 }
520 /* }}} */
521