1 /* Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 #include <rpc/rpc.h>
24 #include <assert.h>
25 
26 #include <stdlib.h>
27 
28 #include "xcom_common.h"
29 #include "simset.h"
30 #include "xcom_vp.h"
31 #include "xcom_cache.h"
32 #include "task.h"
33 #include "node_no.h"
34 #include "server_struct.h"
35 #include "xcom_detector.h"
36 #include "site_struct.h"
37 #include "xcom_transport.h"
38 #include "xcom_base.h"
39 #include "pax_msg.h"
40 #include "xcom_vp_str.h"
41 #include "synode_no.h"
42 #include "task.h"
43 #include "task_debug.h"
44 #include "site_def.h"
45 #include "bitset.h"
46 #include "app_data.h"
47 #include "simset.h"
48 #include "xcom_cfg.h"
49 
50 #define DBG_CACHE_SIZE 0
51 
52 /* Protect at least MIN_CACHED * (number of nodes) pax_machine objects from deallocation by shrink_cache */
53 #define MIN_CACHED 10
54 
55 /* {{{ Paxos machine cache */
56 
57 struct lru_machine {
58   linkage lru_link;
59   pax_machine pax;
60 };
61 
62 static synode_no last_removed_cache;
63 
was_removed_from_cache(synode_no x)64 int	was_removed_from_cache(synode_no x)
65 {
66 	ADD_EVENTS(
67 	    add_event(string_arg("x "));
68 	    add_synode_event(x);
69 	    add_event(string_arg("last_removed_cache "));
70 	    add_synode_event(last_removed_cache);
71 	);
72 	/*
73 	What to do with requests from nodes that have a different group ID?
74 	Should we just ignore them, as we do with the current code,
75 	or should we do something about it?
76 	*/
77 	return last_removed_cache.group_id == x.group_id && !synode_gt(x, last_removed_cache);
78 }
79 
80 #define BUCKETS (CACHED)
81 
82 static linkage pax_hash[BUCKETS];  /* Hash link table */
83 static lru_machine cache[CACHED]; /* The Paxos instances, plus a link for the LRU chain */
84 static linkage protected_lru = {0,&protected_lru, &protected_lru};           /* Head of LRU chain of cache hits */
85 static linkage probation_lru = {0,&probation_lru, &probation_lru};           /* Head of LRU chain of cache misses */
86 
87 static pax_machine *init_pax_machine(pax_machine *p, lru_machine *lru, synode_no synode);
88 
hash_init()89 static void hash_init()
90 {
91   unsigned int	i = 0;
92   for (i = 0; i < BUCKETS; i++) {
93     link_init(&pax_hash[i], type_hash("pax_machine"));
94   }
95 }
96 
97 extern void hexdump(void *p, long length);
98 
99 #if 0
100 #define FNVSTART 0x811c9dc5
101 
102 /* Fowler-Noll-Vo type multiplicative hash */
103 static uint32_t fnv_hash(unsigned char *buf, size_t length, uint32_t sum)
104 {
105   size_t i = 0;
106   for (i = 0; i < length; i++) {
107     sum = sum * (uint32_t)0x01000193 ^ (uint32_t)buf[i];
108   }
109   return sum;
110 }
111 
112 static unsigned int	synode_hash(synode_no synode)
113 {
114   /* Need to hash three fields separately, since struct may contain padding with
115      undefined values */
116   return fnv_hash((unsigned char *) & synode.node, sizeof(synode.node),
117                   fnv_hash((unsigned char *) & synode.group_id, sizeof(synode.group_id),
118                            fnv_hash((unsigned char *) & synode.msgno, sizeof(synode.msgno), FNVSTART)))
119 
120     % BUCKETS;
121 }
122 #else
synode_hash(synode_no synode)123 static unsigned int	synode_hash(synode_no synode)
124 {
125   /* Need to hash three fields separately, since struct may contain padding with
126      undefined values */
127 	return (unsigned int)
128 	  (4711 * synode.node + 5 * synode.group_id + synode.msgno) % BUCKETS;
129 }
130 
131 #endif
132 
hash_in(pax_machine * p)133 static pax_machine *hash_in(pax_machine *p)
134 {
135   MAY_DBG(FN; PTREXP(p);
136   SYCEXP(p->synode);
137   );
138   link_into(&p->hash_link, &pax_hash[synode_hash(p->synode)]);
139   return p;
140 }
141 
hash_out(pax_machine * p)142 static pax_machine *hash_out(pax_machine *p)
143 {
144   MAY_DBG(FN; PTREXP(p);
145   SYCEXP(p->synode);
146   );
147   return (pax_machine * )link_out(&p->hash_link);
148 }
149 
hash_get(synode_no synode)150 pax_machine *hash_get(synode_no synode)
151 {
152   /* static pax_machine *cached_machine = NULL; */
153   linkage * bucket = &pax_hash[synode_hash(synode)];
154 
155   /* if(cached_machine && synode_eq(synode, cached_machine->synode)) */
156   /*   return cached_machine; */
157 
158   FWD_ITER(bucket, pax_machine,
159            if (synode_eq(link_iter->synode, synode)){
160              /* cached_machine = link_iter; */
161              return link_iter;
162            }
163            )
164     ;
165   return NULL;
166 }
167 
168 #if 0
169 static int	is_noop(synode_no synode)
170 {
171   if (is_cached(synode)) {
172     pax_machine * m = get_cache(synode);
173     return m->learner.msg && m->learner.msg->msg_type == no_op;
174   } else {
175     return 0;
176   }
177 }
178 #endif
179 
180 /*
181 Get a machine for (re)use.
182 The machines are statically allocated, and organized in two lists.
183 probation_lru is the free list.
184 protected_lru tracks the machines that are currently in the cache in
185 lest recently used order.
186 */
lru_get()187 static lru_machine *lru_get()
188 {
189 	lru_machine * retval = NULL;
190 	if (!link_empty(&probation_lru)) {
191 		retval = (lru_machine * ) link_first(&probation_lru);
192 	} else {
193 	/* Find the first non-busy instance in the LRU */
194 	FWD_ITER(&protected_lru, lru_machine,
195 		if (!is_busy_machine(&link_iter->pax)) {
196 			retval = link_iter;
197 			/* Since this machine is in in the cache, we need to update
198 			last_removed_cache */
199 			last_removed_cache = retval->pax.synode;
200 			break;
201 		}
202 	)
203 	}
204 	assert(retval && !is_busy_machine(&retval->pax));
205 	return retval;
206 }
207 
lru_touch_hit(pax_machine * p)208 static lru_machine *lru_touch_hit(pax_machine *p)
209 {
210   lru_machine * lru = p->lru;
211   link_into(link_out(&lru->lru_link), &protected_lru);
212   return lru;
213 }
214 
215 
216 /* Initialize the message cache */
init_cache()217 void init_cache()
218 {
219   unsigned int	i = 0;
220   link_init(&protected_lru, type_hash("lru_machine"));
221   link_init(&probation_lru, type_hash("lru_machine"));
222   hash_init();
223   for (i = 0; i < CACHED; i++) {
224     lru_machine * l = &cache[i];
225     link_init(&l->lru_link, type_hash("lru_machine"));
226     link_into(&l->lru_link, &probation_lru);
227     init_pax_machine(&l->pax, l, null_synode);
228   }
229   init_cache_size(); /* After cache has been intialized, size is 0 */
230   last_removed_cache = null_synode;
231 }
232 
deinit_cache()233 void deinit_cache()
234 {
235   int i= 0;
236   /*
237     We reset the memory structures before claiming back memory.
238     Since deiniting the cache happens rarely - mostly when the
239     XCom thread terminates we are ok with doing it like this,
240     i.e., at the cost an additional loop and potential extra
241     allocations - before deallocating.
242 
243     We do this to not clutter the execution flow and improve
244     readability and maintaintability by keeping the source code
245     for the deactivation routine simple and straightforward.
246   */
247   init_cache();
248   for (i= 0; i < CACHED; i++)
249   {
250     lru_machine *l= &cache[i];
251     pax_machine *p= &l->pax;
252     if (p->proposer.prep_nodeset)
253     {
254       free_bit_set(p->proposer.prep_nodeset);
255       p->proposer.prep_nodeset= NULL;
256     }
257     if (p->proposer.prop_nodeset)
258     {
259       free_bit_set(p->proposer.prop_nodeset);
260       p->proposer.prop_nodeset= NULL;
261     }
262   }
263 }
264 
265 /* static synode_no log_tail; */
266 
get_cache_no_touch(synode_no synode)267 pax_machine *get_cache_no_touch(synode_no synode)
268 {
269   pax_machine * retval = hash_get(synode);
270   /* DBGOUT(FN; SYCEXP(synode); STREXP(task_name())); */
271   MAY_DBG(FN; SYCEXP(synode); PTREXP(retval));
272   if (!retval) {
273     lru_machine * l = lru_get(); /* Need to know when it is safe to re-use... */
274     MAY_DBG(FN; PTREXP(l);
275     COPY_AND_FREE_GOUT(dbg_pax_machine(&l->pax));
276     );
277     /*     assert(l->pax.synode > log_tail); */
278 
279     retval = hash_out(&l->pax); /* Remove from hash table */
280     init_pax_machine(retval, l, synode); /* Initialize */
281     hash_in(retval);            /* Insert in hash table again */
282   }
283   MAY_DBG(FN; SYCEXP(synode); PTREXP(retval));
284   return retval;
285 }
286 
get_cache(synode_no synode)287 pax_machine *get_cache(synode_no synode)
288 {
289   pax_machine * retval = get_cache_no_touch(synode);
290   lru_touch_hit(retval); /* Insert in protected_lru */
291   MAY_DBG(FN; SYCEXP(synode); PTREXP(retval));
292   return retval;
293 }
294 
can_deallocate(lru_machine * link_iter)295 static inline int can_deallocate(lru_machine *link_iter)
296 {
297 	synode_no delivered_msg;
298 	site_def const *site = get_site_def();
299 	site_def const *dealloc_site = find_site_def(link_iter->pax.synode);
300 
301 	/* If we have no site, or site was just installed, refuse deallocation */
302 	if(site == 0)
303 		return 0;
304 	/*
305 		With the patch that was put in to ensure that nodes always see  a
306 		global  view  message when it joins, the node that joins may need
307 		messages which are significantly behind the point where the  node
308 		joins  (effectively starting with the latest config). So there is
309 		a very real risk that a node which joined might find  that  those
310 		messages had been removed, since all the other nodes had executed
311 		past that point. This test effectively stops  garbage  collection
312 		of  old  messages until the joining node has got a chance to tell
313 		the others about its low water mark. If  it  has  not  done  that
314 		within  DETECTOR_LIVE_TIMEOUT,  it will be considered dead by the
315 		other nodes anyway, and expelled.
316 	*/
317 	if((site->install_time + DETECTOR_LIVE_TIMEOUT) > task_now())
318 		return 0;
319 	if(dealloc_site == 0)/* Synode does not match any site, OK to deallocate */
320 		return 1;
321 	delivered_msg = get_min_delivered_msg(site);
322 	if(synode_eq(delivered_msg,null_synode)) /* Missing info from some node, not OK */
323 		return 0;
324 	return link_iter->pax.synode.group_id != delivered_msg.group_id ||
325 		(link_iter->pax.synode.msgno + MIN_CACHED) < delivered_msg.msgno;
326 }
327 
328 /*
329 	Loop through the LRU (protected_lru) and deallocate objects until the size of
330 	the cache is below the limit.
331 	The freshly initialized objects are put into the probation_lru, so we can always start
332 	scanning at the end of protected_lru.
333 	lru_get will always look in probation_lru first.
334 */
shrink_cache()335 void shrink_cache()
336 {
337 	FWD_ITER(&protected_lru, lru_machine,
338 	if ( above_cache_limit() &&  can_deallocate(link_iter)) {
339 	    last_removed_cache = link_iter->pax.synode;
340 		hash_out(&link_iter->pax); /* Remove from hash table */
341 		link_into(link_out(&link_iter->lru_link), &probation_lru); /* Put in probation lru */
342 		init_pax_machine(&link_iter->pax, link_iter, null_synode);
343 	} else {
344 		return;
345 	}
346 	);
347 }
348 
xcom_cache_var_init()349 void xcom_cache_var_init()
350 {
351 }
352 
353 /* }}} */
354 
355 
356 /* {{{ Paxos machine */
357 
358 /* Initialize a Paxos instance */
init_pax_machine(pax_machine * p,lru_machine * lru,synode_no synode)359 static pax_machine *init_pax_machine(pax_machine *p, lru_machine *lru, synode_no synode)
360 {
361 	sub_cache_size(pax_machine_size(p));
362 	link_init(&p->hash_link, type_hash("pax_machine"));
363 	p->lru = lru;
364 	p->synode = synode;
365 	p->last_modified = 0.0;
366 	link_init(&p->rv, type_hash("task_env"));
367 	init_ballot(&p->proposer.bal, 0, 0);
368 	init_ballot(&p->proposer.sent_prop, 0, 0);
369 	init_ballot(&p->proposer.sent_learn, -1, 0);
370 	if (!p->proposer.prep_nodeset)
371 		p->proposer.prep_nodeset = new_bit_set(NSERVERS);
372 	BIT_ZERO(p->proposer.prep_nodeset);
373 	if (!p->proposer.prop_nodeset)
374 		p->proposer.prop_nodeset = new_bit_set(NSERVERS);
375 	BIT_ZERO(p->proposer.prop_nodeset);
376 	replace_pax_msg(&p->proposer.msg, NULL);
377 	init_ballot(&p->acceptor.promise, 0, 0);
378 	replace_pax_msg(&p->acceptor.msg, NULL);
379 	replace_pax_msg(&p->learner.msg, NULL);
380 	p->lock = 0;
381 	p->op = initial_op;
382 	p->force_delivery = 0;
383 	return p;
384 }
385 
lock_pax_machine(pax_machine * p)386 int	lock_pax_machine(pax_machine *p)
387 {
388   int	old = p->lock;
389   if (!p->lock)
390     p->lock = 1;
391   return old;
392 }
393 
unlock_pax_machine(pax_machine * p)394 void unlock_pax_machine(pax_machine *p)
395 {
396   p->lock = 0;
397 }
398 
is_busy_machine(pax_machine * p)399 int	is_busy_machine(pax_machine *p)
400 {
401   return p->lock;
402 }
403 
404 /* purecov: begin deadcode */
405 /* Debug nodesets of Paxos instance */
dbg_machine_nodeset(pax_machine * p,u_int nodes)406 char *dbg_machine_nodeset(pax_machine *p, u_int nodes)
407 {
408 	GET_NEW_GOUT;
409 	STRLIT("proposer.prep_nodeset ");
410 	COPY_AND_FREE_GOUT(dbg_bitset(p->proposer.prep_nodeset, nodes));
411 	STRLIT("proposer.prop_nodeset ");
412 	COPY_AND_FREE_GOUT(dbg_bitset(p->proposer.prop_nodeset, nodes));
413 	RET_GOUT;
414 }
415 
416 
417 /* Debug a Paxos instance */
dbg_pax_machine(pax_machine * p)418 char *dbg_pax_machine(pax_machine *p)
419 {
420 	GET_NEW_GOUT;
421 	if (!p) {
422 		STRLIT("p == 0 ");
423 		RET_GOUT;
424 	}
425 	PTREXP(p);
426 	COPY_AND_FREE_GOUT(dbg_machine_nodeset(p, get_maxnodes(find_site_def(p->synode))));
427 	BALCEXP(p->proposer.bal);
428 	BALCEXP(p->proposer.sent_prop);
429 	BALCEXP(p->proposer.sent_learn);
430 	BALCEXP(p->acceptor.promise);
431 	STRLIT("proposer.msg ");
432 	COPY_AND_FREE_GOUT(dbg_pax_msg(p->proposer.msg));
433 	STRLIT("acceptor.msg ");
434 	COPY_AND_FREE_GOUT(dbg_pax_msg(p->acceptor.msg));
435 	STRLIT("learner.msg ");
436 	COPY_AND_FREE_GOUT(dbg_pax_msg(p->learner.msg));
437 	NDBG(p->last_modified,f);
438 	NDBG(p->lock,d);
439 	STREXP(pax_op_to_str(p->op));
440 	RET_GOUT;
441 }
442 
443 /* purecov: end */
444 
445 /*
446   Return the size of a pax_msg. Counts only the pax_msg struct itself
447   and the size of the app_data.
448 */
get_app_msg_size(pax_msg const * p)449 static inline size_t get_app_msg_size(pax_msg const *p)
450 {
451        if(!p)
452                return (size_t) 0;
453        else
454                return sizeof(pax_msg) + app_data_list_size(p->a);
455 }
456 
457 /*
458   Return the size of the messages referenced by a pax_machine.
459   The pax_machine itself is statically allocated, so we do
460   not count this when computing the cache size.
461 */
pax_machine_size(pax_machine const * p)462 size_t pax_machine_size(pax_machine const *p)
463 {
464        size_t size = get_app_msg_size(p->proposer.msg);
465 
466        if (p->acceptor.msg && p->proposer.msg != p->acceptor.msg)
467                size += get_app_msg_size(p->acceptor.msg);
468 
469        if (p->learner.msg && p->acceptor.msg != p->learner.msg && p->proposer.msg != p->learner.msg)
470                size += get_app_msg_size(p->learner.msg);
471        return size;
472 }
473 
474 static size_t cache_size = 0;
475 
476 /* The cache itself is statically allocated, set size of dynamically allocted data to 0 */
init_cache_size()477 void init_cache_size()
478 {
479        cache_size = 0;
480 }
481 
482 /* Add to cache size */
add_cache_size(size_t x)483 size_t add_cache_size(size_t x)
484 {
485        cache_size += x;
486        if (DBG_CACHE_SIZE && x)
487        {
488          G_DEBUG("%f %s:%d cache_size %lu x %lu", seconds(), __FILE__, __LINE__,
489                  (long unsigned int)cache_size, (long unsigned int)x);
490        }
491        return cache_size;
492 }
493 
494 /* Subtract from cache size */
sub_cache_size(size_t x)495 size_t sub_cache_size(size_t x)
496 {
497        cache_size -= x;
498        if (DBG_CACHE_SIZE && x)
499        {
500          G_DEBUG("%f %s:%d cache_size %lu x %lu", seconds(), __FILE__, __LINE__,
501                  (long unsigned int)cache_size, (long unsigned int)x);
502        }
503        return cache_size;
504 }
505 
506 /* See if cache is above limit */
above_cache_limit()507 int    above_cache_limit()
508 {
509        return  the_app_xcom_cfg && cache_size > the_app_xcom_cfg->cache_limit ;
510 }
511 
512 /* If cfg object exits, set max cache size */
set_max_cache_size(size_t x)513 size_t set_max_cache_size(size_t x)
514 {
515        if (the_app_xcom_cfg)
516                return the_app_xcom_cfg->cache_limit = x;
517        else
518                return 0;
519 }
520 /* }}} */
521