1 /*
2  * %CopyrightBegin%
3  *
4  * Copyright Ericsson AB and Kjell Winblad 1998-2020. All Rights Reserved.
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * %CopyrightEnd%
19  */
20 
21 /*
22  * Description: Implementation of ETS ordered_set table type with
23  *              fine-grained synchronization.
24  *
25  * Author: 	Kjell Winblad
26  *
27  * This implementation is based on the contention adapting search tree
28  * (CA tree). The CA tree is a concurrent data structure that
29  * dynamically adapts its synchronization granularity based on how
30  * much contention is detected in locks. The following publication
31  * contains a detailed description of CA trees:
32  *
33  * A Contention Adapting Approach to Concurrent Ordered Sets
34  * Journal of Parallel and Distributed Computing, 2018
35  * Kjell Winblad and Konstantinos Sagonas
36  * https://doi.org/10.1016/j.jpdc.2017.11.007
37  *
38  * The following publication may also be interesting as it discusses
39  * how the CA tree can be used as an ETS ordered_set table type
40  * backend:
41  *
42  * More Scalable Ordered Set for ETS Using Adaptation
43  * In Thirteenth ACM SIGPLAN workshop on Erlang (2014)
44  * Kjell Winblad and Konstantinos Sagonas
45  * https://doi.org/10.1145/2633448.2633455
46  *
47  * This implementation of the ordered_set ETS table type is only
48  * activated when the options {write_concurrency, true}, public and
49  * ordered_set are passed to the ets:new/2 function. This
50  * implementation is expected to scale better than the default
51  * implementation located in "erl_db_tree.c".
52  *
53  * The default implementation has a static stack optimization (see
54  * get_static_stack in erl_db_tree.c). This implementation does not
55  * have such an optimization as it induces bad scalability when
56  * concurrent read operations are frequent (they all try to get hold
57  * of the same stack). The default implementation may thus perform
58  * better compared to this implementation in scenarios where the
59  * static stack optimization is useful. One such scenario is when only
60  * one process is accessing the table and this process is traversing
61  * the table with a sequence of next/2 calls.
62  */
63 
64 #ifdef HAVE_CONFIG_H
65 #  include "config.h"
66 #endif
67 
68 #include "sys.h"
69 #include "erl_vm.h"
70 #include "global.h"
71 #include "erl_process.h"
72 #include "error.h"
73 #define ERTS_WANT_DB_INTERNAL__
74 #include "erl_db.h"
75 #include "bif.h"
76 #include "big.h"
77 #include "erl_binary.h"
78 
79 #include "erl_db_catree.h"
80 #include "erl_db_tree.h"
81 #include "erl_db_tree_util.h"
82 
83 #ifdef DEBUG
84 #  define IF_DEBUG(X) X
85 #else
86 #  define IF_DEBUG(X)
87 #endif
88 
89 /*
90 ** Forward declarations
91 */
92 
93 static SWord do_delete_base_node_cont(DbTableCATree *tb,
94                                     DbTableCATreeNode *base_node,
95                                     SWord num_left);
96 
97 /* Method interface functions */
98 static int db_first_catree(Process *p, DbTable *tbl,
99                            Eterm *ret);
100 static int db_next_catree(Process *p, DbTable *tbl,
101                           Eterm key, Eterm *ret);
102 static int db_last_catree(Process *p, DbTable *tbl,
103                           Eterm *ret);
104 static int db_prev_catree(Process *p, DbTable *tbl,
105                           Eterm key,
106                           Eterm *ret);
107 static int db_put_catree(DbTable *tbl, Eterm obj, int key_clash_fail,
108                          SWord *consumed_reds_p);
109 static int db_get_catree(Process *p, DbTable *tbl,
110                          Eterm key,  Eterm *ret);
111 static int db_member_catree(DbTable *tbl, Eterm key, Eterm *ret);
112 static int db_get_element_catree(Process *p, DbTable *tbl,
113                                  Eterm key,int ndex,
114                                  Eterm *ret);
115 static int db_erase_catree(DbTable *tbl, Eterm key, Eterm *ret);
116 static int db_erase_object_catree(DbTable *tbl, Eterm object,Eterm *ret);
117 static int db_slot_catree(Process *p, DbTable *tbl,
118                           Eterm slot_term,  Eterm *ret);
119 static int db_select_catree(Process *p, DbTable *tbl, Eterm tid,
120                             Eterm pattern, int reversed, Eterm *ret,
121                             enum DbIterSafety);
122 static int db_select_count_catree(Process *p, DbTable *tbl, Eterm tid,
123                                   Eterm pattern,  Eterm *ret, enum DbIterSafety);
124 static int db_select_chunk_catree(Process *p, DbTable *tbl, Eterm tid,
125                                   Eterm pattern, Sint chunk_size,
126                                   int reversed, Eterm *ret, enum DbIterSafety);
127 static int db_select_continue_catree(Process *p, DbTable *tbl,
128                                      Eterm continuation, Eterm *ret,
129                                      enum DbIterSafety*);
130 static int db_select_count_continue_catree(Process *p, DbTable *tbl,
131                                            Eterm continuation, Eterm *ret,
132                                            enum DbIterSafety*);
133 static int db_select_delete_catree(Process *p, DbTable *tbl, Eterm tid,
134                                    Eterm pattern,  Eterm *ret,
135                                    enum DbIterSafety);
136 static int db_select_delete_continue_catree(Process *p, DbTable *tbl,
137                                             Eterm continuation, Eterm *ret,
138                                             enum DbIterSafety*);
139 static int db_select_replace_catree(Process *p, DbTable *tbl, Eterm tid,
140                                     Eterm pattern, Eterm *ret,
141                                     enum DbIterSafety);
142 static int db_select_replace_continue_catree(Process *p, DbTable *tbl,
143                                              Eterm continuation, Eterm *ret,
144                                              enum DbIterSafety*);
145 static int db_take_catree(Process *, DbTable *, Eterm, Eterm *);
146 static void db_print_catree(fmtfn_t to, void *to_arg,
147                             int show, DbTable *tbl);
148 static int db_free_table_catree(DbTable *tbl);
149 static SWord db_free_table_continue_catree(DbTable *tbl, SWord);
150 static void db_foreach_offheap_catree(DbTable *,
151                                       void (*)(ErlOffHeap *, void *),
152                                       void *);
153 static SWord db_delete_all_objects_catree(Process* p,
154                                           DbTable* tbl,
155                                           SWord reds,
156                                           Eterm* nitems_holder_wb);
157 static Eterm db_delete_all_objects_get_nitems_from_holder_catree(Process* p,
158                                                                  Eterm nitems_holder);
159 static int
160 db_lookup_dbterm_catree(Process *, DbTable *, Eterm key, Eterm obj,
161                         DbUpdateHandle*);
162 static void db_finalize_dbterm_catree(int cret, DbUpdateHandle *);
163 static int db_get_binary_info_catree(Process*, DbTable*, Eterm key, Eterm *ret);
164 static int db_put_dbterm_catree(DbTable* tbl,
165                                 void* obj,
166                                 int key_clash_fail,
167                                 SWord *consumed_reds_p);
168 
169 static void split_catree(DbTableCATree *tb,
170                          DbTableCATreeNode* ERTS_RESTRICT base,
171                          DbTableCATreeNode* ERTS_RESTRICT parent);
172 static void join_catree(DbTableCATree *tb,
173                         DbTableCATreeNode *thiz,
174                         DbTableCATreeNode *parent);
175 static ERTS_INLINE
176 int try_wlock_base_node(DbTableCATreeBaseNode *base_node);
177 static ERTS_INLINE
178 void wunlock_base_node(DbTableCATreeNode *base_node);
179 static ERTS_INLINE
180 void wlock_base_node_no_stats(DbTableCATreeNode *base_node);
181 static ERTS_INLINE
182 void wunlock_adapt_base_node(DbTableCATree* tb,
183                              DbTableCATreeNode* node,
184                              DbTableCATreeNode* parent,
185                              int current_level);
186 /*
187 ** External interface
188 */
189 DbTableMethod db_catree =
190 {
191     db_create_catree,
192     db_first_catree,
193     db_next_catree,
194     db_last_catree,
195     db_prev_catree,
196     db_put_catree,
197     db_get_catree,
198     db_get_element_catree,
199     db_member_catree,
200     db_erase_catree,
201     db_erase_object_catree,
202     db_slot_catree,
203     db_select_chunk_catree,
204     db_select_catree,
205     db_select_delete_catree,
206     db_select_continue_catree,
207     db_select_delete_continue_catree,
208     db_select_count_catree,
209     db_select_count_continue_catree,
210     db_select_replace_catree,
211     db_select_replace_continue_catree,
212     db_take_catree,
213     db_delete_all_objects_catree,
214     db_delete_all_objects_get_nitems_from_holder_catree,
215     db_free_table_catree,
216     db_free_table_continue_catree,
217     db_print_catree,
218     db_foreach_offheap_catree,
219     db_lookup_dbterm_catree,
220     db_finalize_dbterm_catree,
221     db_eterm_to_dbterm_tree_common,
222     db_dbterm_list_prepend_tree_common,
223     db_dbterm_list_remove_first_tree_common,
224     db_put_dbterm_catree,
225     db_free_dbterm_tree_common,
226     db_get_dbterm_key_tree_common,
227     db_get_binary_info_catree,
228     db_first_catree, /* raw_first same as first */
229     db_next_catree   /* raw_next same as next */
230 };
231 
232 /*
233  * Constants
234  */
235 
236 #define ERL_DB_CATREE_LOCK_FAILURE_CONTRIBUTION 250
237 #define ERL_DB_CATREE_LOCK_SUCCESS_CONTRIBUTION (-1)
238 #define ERL_DB_CATREE_LOCK_GRAVITY_CONTRIBUTION (-500)
239 #define ERL_DB_CATREE_LOCK_GRAVITY_PATTERN (0xFF800000)
240 #define ERL_DB_CATREE_LOCK_MORE_THAN_ONE_CONTRIBUTION (-10)
241 #define ERL_DB_CATREE_HIGH_CONTENTION_LIMIT 1000
242 #define ERL_DB_CATREE_LOW_CONTENTION_LIMIT (-1000)
243 #define ERL_DB_CATREE_MAX_ROUTE_NODE_LAYER_HEIGHT 16
244 #define ERL_DB_CATREE_LOCK_LOW_NO_CONTRIBUTION_LIMIT (-20000)
245 #define ERL_DB_CATREE_LOCK_HIGH_NO_CONTRIBUTION_LIMIT (20000)
246 
247 /*
248  * Internal CA tree related helper functions and macros
249  */
250 
251 #define GET_ROUTE_NODE_KEY(node) (node->u.route.key.term)
252 #define GET_BASE_NODE_LOCK(node) (&(node->u.base.lock))
253 #define GET_ROUTE_NODE_LOCK(node) (&(node->u.route.lock))
254 
255 
256 /* Helpers for reading and writing shared atomic variables */
257 
258 /* No memory barrier */
259 #define GET_ROOT(tb) ((DbTableCATreeNode*)erts_atomic_read_nob(&((tb)->root)))
260 #define GET_LEFT(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_nob(&(ca_tree_route_node->u.route.left)))
261 #define GET_RIGHT(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_nob(&(ca_tree_route_node->u.route.right)))
262 #define SET_ROOT(tb, v) erts_atomic_set_nob(&((tb)->root), (erts_aint_t)(v))
263 #define SET_LEFT(ca_tree_route_node, v) erts_atomic_set_nob(&(ca_tree_route_node->u.route.left), (erts_aint_t)(v));
264 #define SET_RIGHT(ca_tree_route_node, v) erts_atomic_set_nob(&(ca_tree_route_node->u.route.right), (erts_aint_t)(v));
265 
266 
267 /* Release or acquire barriers */
268 #define GET_ROOT_ACQB(tb) ((DbTableCATreeNode*)erts_atomic_read_acqb(&((tb)->root)))
269 #define GET_LEFT_ACQB(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_acqb(&(ca_tree_route_node->u.route.left)))
270 #define GET_RIGHT_ACQB(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_acqb(&(ca_tree_route_node->u.route.right)))
271 #define SET_ROOT_RELB(tb, v) erts_atomic_set_relb(&((tb)->root), (erts_aint_t)(v))
272 #define SET_LEFT_RELB(ca_tree_route_node, v) erts_atomic_set_relb(&(ca_tree_route_node->u.route.left), (erts_aint_t)(v));
273 #define SET_RIGHT_RELB(ca_tree_route_node, v) erts_atomic_set_relb(&(ca_tree_route_node->u.route.right), (erts_aint_t)(v));
274 
275 /* Change base node lock statistics */
276 #define BASE_NODE_STAT_SET(NODE, VALUE) erts_atomic_set_nob(&(NODE)->u.base.lock_statistics, VALUE)
277 #define BASE_NODE_STAT_READ(NODE) erts_atomic_read_nob(&(NODE)->u.base.lock_statistics)
278 #define BASE_NODE_STAT_ADD(NODE, VALUE)                                 \
279     do {                                                                \
280         Sint v = erts_atomic_read_nob(&((NODE)->u.base.lock_statistics)); \
281         ASSERT(VALUE > 0);                                              \
282         if(v < ERL_DB_CATREE_LOCK_HIGH_NO_CONTRIBUTION_LIMIT) {          \
283             erts_atomic_set_nob(&(NODE->u.base.lock_statistics), v + VALUE); \
284         }                                                               \
285     }while(0);
286 #define BASE_NODE_STAT_SUB(NODE, VALUE)                                 \
287     do {                                                                \
288         Sint v = erts_atomic_read_nob(&((NODE)->u.base.lock_statistics)); \
289         ASSERT(VALUE < 0);                                              \
290         if(v > ERL_DB_CATREE_LOCK_LOW_NO_CONTRIBUTION_LIMIT) {          \
291             erts_atomic_set_nob(&(NODE->u.base.lock_statistics), v + VALUE); \
292         }                                                               \
293     }while(0);
294 
295 
296 /* Compares a key to the key in a route node */
cmp_key_route(Eterm key,DbTableCATreeNode * obj)297 static ERTS_INLINE Sint cmp_key_route(Eterm key,
298                                       DbTableCATreeNode *obj)
299 {
300     return CMP(key, GET_ROUTE_NODE_KEY(obj));
301 }
302 
303 /*
304  * Used by the split_tree function
305  */
306 static ERTS_INLINE
less_than_two_elements(TreeDbTerm * root)307 int less_than_two_elements(TreeDbTerm *root)
308 {
309     return root == NULL || (root->left == NULL && root->right == NULL);
310 }
311 
312 /*
313  * Inserts a TreeDbTerm into a tree. Returns the new root.
314  */
315 static ERTS_INLINE
insert_TreeDbTerm(DbTableCATree * tb,TreeDbTerm * insert_to_root,TreeDbTerm * value_to_insert)316 TreeDbTerm* insert_TreeDbTerm(DbTableCATree *tb,
317                               TreeDbTerm *insert_to_root,
318                               TreeDbTerm *value_to_insert) {
319     /* Non recursive insertion in AVL tree, building our own stack */
320     TreeDbTerm **tstack[STACK_NEED];
321     int tpos = 0;
322     int dstack[STACK_NEED+1];
323     int dpos = 0;
324     int state = 0;
325     TreeDbTerm * base = insert_to_root;
326     TreeDbTerm **this = &base;
327     Sint c;
328     Eterm key;
329     int dir;
330     TreeDbTerm *p1, *p2, *p;
331 
332     key = GETKEY(tb, value_to_insert->dbterm.tpl);
333 
334     dstack[dpos++] = DIR_END;
335     for (;;)
336 	if (!*this) { /* Found our place */
337 	    state = 1;
338 	    *this = value_to_insert;
339 	    (*this)->balance = 0;
340 	    (*this)->left = (*this)->right = NULL;
341 	    break;
342 	} else if ((c = cmp_key(&tb->common, key, *this)) < 0) {
343 	    /* go lefts */
344 	    dstack[dpos++] = DIR_LEFT;
345 	    tstack[tpos++] = this;
346 	    this = &((*this)->left);
347 	} else { /* go right */
348 	    dstack[dpos++] = DIR_RIGHT;
349 	    tstack[tpos++] = this;
350 	    this = &((*this)->right);
351 	}
352 
353     while (state && ( dir = dstack[--dpos] ) != DIR_END) {
354 	this = tstack[--tpos];
355 	p = *this;
356 	if (dir == DIR_LEFT) {
357 	    switch (p->balance) {
358 	    case 1:
359 		p->balance = 0;
360 		state = 0;
361 		break;
362 	    case 0:
363 		p->balance = -1;
364 		break;
365 	    case -1: /* The icky case */
366 		p1 = p->left;
367 		if (p1->balance == -1) { /* Single LL rotation */
368 		    p->left = p1->right;
369 		    p1->right = p;
370 		    p->balance = 0;
371 		    (*this) = p1;
372 		} else { /* Double RR rotation */
373                     ASSERT(p1->right);
374 		    p2 = p1->right;
375 		    p1->right = p2->left;
376 		    p2->left = p1;
377 		    p->left = p2->right;
378 		    p2->right = p;
379 		    p->balance = (p2->balance == -1) ? +1 : 0;
380 		    p1->balance = (p2->balance == 1) ? -1 : 0;
381 		    (*this) = p2;
382 		}
383 		(*this)->balance = 0;
384 		state = 0;
385 		break;
386 	    }
387 	} else { /* dir == DIR_RIGHT */
388 	    switch (p->balance) {
389 	    case -1:
390 		p->balance = 0;
391 		state = 0;
392 		break;
393 	    case 0:
394 		p->balance = 1;
395 		break;
396 	    case 1:
397 		p1 = p->right;
398 		if (p1->balance == 1) { /* Single RR rotation */
399 		    p->right = p1->left;
400 		    p1->left = p;
401 		    p->balance = 0;
402 		    (*this) = p1;
403 		} else { /* Double RL rotation */
404                     ASSERT(p1->left);
405 		    p2 = p1->left;
406 		    p1->left = p2->right;
407 		    p2->right = p1;
408 		    p->right = p2->left;
409 		    p2->left = p;
410 		    p->balance = (p2->balance == 1) ? -1 : 0;
411 		    p1->balance = (p2->balance == -1) ? 1 : 0;
412 		    (*this) = p2;
413 		}
414 		(*this)->balance = 0;
415 		state = 0;
416 		break;
417 	    }
418 	}
419     }
420     return base;
421 }
422 
423 /*
424  * Split an AVL tree into two trees. The function stores the node
425  * containing the "split key" in the write back parameter
426  * split_key_wb. The function stores the left tree containing the keys
427  * that are smaller than the "split key" in the write back parameter
428  * left_wb and the tree containing the rest of the keys in the write
429  * back parameter right_wb.
430  */
split_tree(DbTableCATree * tb,TreeDbTerm * root,TreeDbTerm ** split_key_node_wb,TreeDbTerm ** left_wb,TreeDbTerm ** right_wb)431 static void split_tree(DbTableCATree *tb,
432                        TreeDbTerm *root,
433                        TreeDbTerm **split_key_node_wb,
434                        TreeDbTerm **left_wb,
435                        TreeDbTerm **right_wb) {
436     TreeDbTerm * split_node = NULL;
437     TreeDbTerm * left_root;
438     TreeDbTerm * right_root;
439     if (root->left == NULL) { /* To get non empty split */
440         *right_wb = root->right;
441         *split_key_node_wb = root->right;
442         root->right = NULL;
443         root->balance = 0;
444         *left_wb = root;
445         return;
446     }
447     split_node = root;
448     left_root = split_node->left;
449     split_node->left = NULL;
450     right_root = split_node->right;
451     split_node->right = NULL;
452     right_root = insert_TreeDbTerm(tb, right_root, split_node);
453     *split_key_node_wb = split_node;
454     *left_wb = left_root;
455     *right_wb = right_root;
456 }
457 
458 /*
459  * Used by the join_trees function
460  */
compute_tree_hight(TreeDbTerm * root)461 static ERTS_INLINE int compute_tree_hight(TreeDbTerm * root)
462 {
463     if(root == NULL) {
464         return 0;
465     } else {
466         TreeDbTerm * current_node = root;
467         int hight_so_far = 1;
468         while (current_node->left != NULL || current_node->right != NULL) {
469             if (current_node->balance == -1) {
470                 ASSERT(current_node->left != NULL);
471                 current_node = current_node->left;
472             } else {
473                 ASSERT(current_node->right != NULL);
474                 current_node = current_node->right;
475             }
476             hight_so_far = hight_so_far + 1;
477         }
478         return hight_so_far;
479     }
480 }
481 
482 /*
483  * Used by the join_trees function
484  */
485 static ERTS_INLINE
linkout_min_or_max_tree_node(TreeDbTerm ** root,int is_min)486 TreeDbTerm* linkout_min_or_max_tree_node(TreeDbTerm **root, int is_min)
487 {
488     TreeDbTerm **tstack[STACK_NEED];
489     int tpos = 0;
490     int dstack[STACK_NEED+1];
491     int dpos = 0;
492     int state = 0;
493     TreeDbTerm **this = root;
494     int dir;
495     TreeDbTerm *q = NULL;
496 
497     dstack[dpos++] = DIR_END;
498     for (;;) {
499         if (!*this) { /* Failure */
500             return NULL;
501         } else if (is_min && (*this)->left != NULL) {
502             dstack[dpos++] = DIR_LEFT;
503             tstack[tpos++] = this;
504             this = &((*this)->left);
505         } else if (!is_min && (*this)->right != NULL) {
506             dstack[dpos++] = DIR_RIGHT;
507             tstack[tpos++] = this;
508             this = &((*this)->right);
509         } else { /* Min value, found the one to splice out */
510             q = (*this);
511             if (q->right == NULL) {
512                 (*this) = q->left;
513                 state = 1;
514             } else if (q->left == NULL) {
515                 (*this) = q->right;
516                 state = 1;
517             }
518             break;
519         }
520     }
521     while (state && ( dir = dstack[--dpos] ) != DIR_END) {
522         this = tstack[--tpos];
523         if (dir == DIR_LEFT) {
524             state = tree_balance_left(this);
525         } else {
526             state = tree_balance_right(this);
527         }
528     }
529     return q;
530 }
531 
532 #define LINKOUT_MIN_TREE_NODE(root) linkout_min_or_max_tree_node(root, 1)
533 #define LINKOUT_MAX_TREE_NODE(root) linkout_min_or_max_tree_node(root, 0)
534 
535 /*
536  * Joins two AVL trees where all the keys in the left one are smaller
537  * then the keys in the right one and returns the resulting tree.
538  *
539  * The algorithm is described on page 474 in D. E. Knuth. The Art of
540  * Computer Programming: Sorting and Searching,
541  * vol. 3. Addison-Wesley, 2nd edition, 1998.
542  */
join_trees(TreeDbTerm * left_root_param,TreeDbTerm * right_root_param)543 static TreeDbTerm* join_trees(TreeDbTerm *left_root_param,
544                                TreeDbTerm *right_root_param)
545 {
546     TreeDbTerm **tstack[STACK_NEED];
547     int tpos = 0;
548     int dstack[STACK_NEED+1];
549     int dpos = 0;
550     int state = 1;
551     TreeDbTerm **this;
552     int dir;
553     TreeDbTerm *p1, *p2, *p;
554     TreeDbTerm *left_root = left_root_param;
555     TreeDbTerm *right_root = right_root_param;
556     int left_height;
557     int right_height;
558     int current_height;
559     dstack[dpos++] = DIR_END;
560     if (left_root == NULL) {
561         return right_root;
562     } else if (right_root == NULL) {
563         return left_root;
564     }
565 
566     left_height = compute_tree_hight(left_root);
567     right_height = compute_tree_hight(right_root);
568     if (left_height >= right_height) {
569         TreeDbTerm * new_root =
570             LINKOUT_MIN_TREE_NODE(&right_root);
571         int new_right_height = compute_tree_hight(right_root);
572         TreeDbTerm * current_node = left_root;
573         this = &left_root;
574         current_height = left_height;
575         while(current_height > new_right_height + 1) {
576             if (current_node->balance == -1) {
577                 current_height = current_height - 2;
578             } else {
579                 current_height = current_height - 1;
580             }
581             dstack[dpos++] = DIR_RIGHT;
582             tstack[tpos++] = this;
583             this = &((*this)->right);
584             current_node = current_node->right;
585         }
586         new_root->left = current_node;
587         new_root->right = right_root;
588         new_root->balance = new_right_height - current_height;
589         *this = new_root;
590     } else {
591         /* This case is symmetric to the previous case */
592         TreeDbTerm * new_root =
593             LINKOUT_MAX_TREE_NODE(&left_root);
594         int new_left_height = compute_tree_hight(left_root);
595         TreeDbTerm * current_node = right_root;
596         this = &right_root;
597         current_height = right_height;
598         while (current_height > new_left_height + 1) {
599             if (current_node->balance == 1) {
600                 current_height = current_height - 2;
601             } else {
602                 current_height = current_height - 1;
603             }
604             dstack[dpos++] = DIR_LEFT;
605             tstack[tpos++] = this;
606             this = &((*this)->left);
607             current_node = current_node->left;
608         }
609         new_root->right = current_node;
610         new_root->left = left_root;
611         new_root->balance = current_height - new_left_height;
612         *this = new_root;
613     }
614     /* Now we need to continue as if this was during the insert */
615     while (state && ( dir = dstack[--dpos] ) != DIR_END) {
616         this = tstack[--tpos];
617         p = *this;
618         if (dir == DIR_LEFT) {
619             switch (p->balance) {
620             case 1:
621                 p->balance = 0;
622                 state = 0;
623                 break;
624             case 0:
625                 p->balance = -1;
626                 break;
627             case -1: /* The icky case */
628                 p1 = p->left;
629                 if (p1->balance == -1) { /* Single LL rotation */
630                     p->left = p1->right;
631                     p1->right = p;
632                     p->balance = 0;
633                     (*this) = p1;
634                 } else { /* Double RR rotation */
635                     ASSERT(p1->right);
636                     p2 = p1->right;
637                     p1->right = p2->left;
638                     p2->left = p1;
639                     p->left = p2->right;
640                     p2->right = p;
641                     p->balance = (p2->balance == -1) ? +1 : 0;
642                     p1->balance = (p2->balance == 1) ? -1 : 0;
643                     (*this) = p2;
644                 }
645                 (*this)->balance = 0;
646                 state = 0;
647                 break;
648             }
649         } else { /* dir == DIR_RIGHT */
650             switch (p->balance) {
651             case -1:
652                 p->balance = 0;
653                 state = 0;
654                 break;
655             case 0:
656                 p->balance = 1;
657                 break;
658             case 1:
659                 p1 = p->right;
660                 if (p1->balance == 1) { /* Single RR rotation */
661                     p->right = p1->left;
662                     p1->left = p;
663                     p->balance = 0;
664                     (*this) = p1;
665                 } else { /* Double RL rotation */
666                     ASSERT(p1->left);
667                     p2 = p1->left;
668                     p1->left = p2->right;
669                     p2->right = p1;
670                     p->right = p2->left;
671                     p2->left = p;
672                     p->balance = (p2->balance == 1) ? -1 : 0;
673                     p1->balance = (p2->balance == -1) ? 1 : 0;
674                     (*this) = p2;
675                 }
676                 (*this)->balance = 0;
677                 state = 0;
678                 break;
679             }
680         }
681     }
682     /* Return the joined tree */
683     if (left_height >= right_height) {
684         return left_root;
685     } else {
686         return right_root;
687     }
688 }
689 
690 #ifdef DEBUG
691 #  define PROVOKE_RANDOM_SPLIT_JOIN
692 #endif
693 #ifdef PROVOKE_RANDOM_SPLIT_JOIN
dbg_fastrand(void)694 static int dbg_fastrand(void)
695 {
696     static int g_seed = 648835;
697     g_seed = (214013*g_seed+2531011);
698     return (g_seed>>16)&0x7FFF;
699 }
700 
dbg_provoke_random_splitjoin(DbTableCATree * tb,DbTableCATreeNode * base_node)701 static void dbg_provoke_random_splitjoin(DbTableCATree* tb,
702                                          DbTableCATreeNode* base_node)
703 {
704     if (tb->common.status & DB_CATREE_FORCE_SPLIT ||
705         !(tb->common.status & DB_CATREE_DEBUG_RANDOM_SPLIT_JOIN))
706         return;
707 
708     switch (dbg_fastrand() % 8) {
709     case 1:
710         BASE_NODE_STAT_ADD(base_node, 1+ERL_DB_CATREE_HIGH_CONTENTION_LIMIT);
711         break;
712     case 2:
713         BASE_NODE_STAT_SUB(base_node, -1+ERL_DB_CATREE_LOW_CONTENTION_LIMIT);
714         break;
715     }
716 }
717 #else
718 #  define dbg_provoke_random_splitjoin(T,N)
719 #endif /* PROVOKE_RANDOM_SPLIT_JOIN */
720 
721 static ERTS_NOINLINE
do_random_join(DbTableCATree * tb,Uint rand)722 void do_random_join(DbTableCATree* tb, Uint rand)
723 {
724     DbTableCATreeNode* node = GET_ROOT_ACQB(tb);
725     DbTableCATreeNode* parent = NULL;
726     int level = 0;
727     Sint stat;
728     while (!node->is_base_node) {
729         parent = node;
730         if ((rand & (1 << level)) == 0) {
731             node = GET_LEFT_ACQB(node);
732         } else {
733             node = GET_RIGHT_ACQB(node);
734         }
735         level++;
736     }
737     BASE_NODE_STAT_SUB(node, ERL_DB_CATREE_LOCK_GRAVITY_CONTRIBUTION);
738     stat = BASE_NODE_STAT_READ(node);
739     if (stat >= ERL_DB_CATREE_LOW_CONTENTION_LIMIT &&
740         stat <= ERL_DB_CATREE_HIGH_CONTENTION_LIMIT) {
741         return; /* No adaptation */
742     }
743     if (parent != NULL && !try_wlock_base_node(&node->u.base)) {
744         if (!node->u.base.is_valid) {
745             wunlock_base_node(node);
746             return;
747         }
748         wunlock_adapt_base_node(tb, node, parent, level);
749     }
750 }
751 
752 static ERTS_INLINE
do_random_join_with_low_probability(DbTableCATree * tb,Uint seed)753 void do_random_join_with_low_probability(DbTableCATree* tb, Uint seed)
754 {
755 #ifndef ERTS_DB_CA_TREE_NO_RANDOM_JOIN_WITH_LOW_PROBABILITY
756     Uint32 rand = erts_sched_local_random(seed);
757     if (((rand & ERL_DB_CATREE_LOCK_GRAVITY_PATTERN)) == 0) {
758         do_random_join(tb, rand);
759     }
760 #endif
761 }
762 
763 static ERTS_INLINE
try_wlock_base_node(DbTableCATreeBaseNode * base_node)764 int try_wlock_base_node(DbTableCATreeBaseNode *base_node)
765 {
766     return EBUSY == erts_rwmtx_tryrwlock(&base_node->lock);
767 }
768 
769 /*
770  * Locks a base node without adjusting the lock statistics
771  */
772 static ERTS_INLINE
wlock_base_node_no_stats(DbTableCATreeNode * base_node)773 void wlock_base_node_no_stats(DbTableCATreeNode *base_node)
774 {
775     ASSERT(base_node->is_base_node);
776     erts_rwmtx_rwlock(&base_node->u.base.lock);
777 }
778 
779 /*
780  * Locks a base node and adjusts the lock statistics according to if
781  * the lock was contended or not
782  */
783 static ERTS_INLINE
wlock_base_node(DbTableCATreeNode * base_node)784 void wlock_base_node(DbTableCATreeNode *base_node)
785 {
786     ASSERT(base_node->is_base_node);
787     if (try_wlock_base_node(&base_node->u.base)) {
788         /* The lock is contended */
789         wlock_base_node_no_stats(base_node);
790         BASE_NODE_STAT_ADD(base_node, ERL_DB_CATREE_LOCK_FAILURE_CONTRIBUTION);
791     } else {
792         BASE_NODE_STAT_SUB(base_node, ERL_DB_CATREE_LOCK_SUCCESS_CONTRIBUTION);
793     }
794 }
795 
796 static ERTS_INLINE
wunlock_base_node(DbTableCATreeNode * base_node)797 void wunlock_base_node(DbTableCATreeNode *base_node)
798 {
799     erts_rwmtx_rwunlock(&base_node->u.base.lock);
800 }
801 
802 static ERTS_INLINE
wunlock_adapt_base_node(DbTableCATree * tb,DbTableCATreeNode * node,DbTableCATreeNode * parent,int current_level)803 void wunlock_adapt_base_node(DbTableCATree* tb,
804                              DbTableCATreeNode* node,
805                              DbTableCATreeNode* parent,
806                              int current_level)
807 {
808     Sint base_node_lock_stat = BASE_NODE_STAT_READ(node);
809     dbg_provoke_random_splitjoin(tb,node);
810     if ((!node->u.base.root && parent && !(tb->common.status
811                                            & DB_CATREE_FORCE_SPLIT))
812         || base_node_lock_stat < ERL_DB_CATREE_LOW_CONTENTION_LIMIT) {
813         join_catree(tb, node, parent);
814     }
815     else if (base_node_lock_stat > ERL_DB_CATREE_HIGH_CONTENTION_LIMIT
816         && current_level < ERL_DB_CATREE_MAX_ROUTE_NODE_LAYER_HEIGHT) {
817         split_catree(tb, node, parent);
818     }
819     else {
820         wunlock_base_node(node);
821     }
822 }
823 
824 static ERTS_INLINE
rlock_base_node(DbTableCATreeNode * base_node)825 void rlock_base_node(DbTableCATreeNode *base_node)
826 {
827     ASSERT(base_node->is_base_node);
828     if (EBUSY == erts_rwmtx_tryrlock(&base_node->u.base.lock)) {
829         /* The lock is contended */
830         BASE_NODE_STAT_ADD(base_node, ERL_DB_CATREE_LOCK_FAILURE_CONTRIBUTION);
831         erts_rwmtx_rlock(&base_node->u.base.lock);
832     }
833 }
834 
835 static ERTS_INLINE
runlock_base_node(DbTableCATreeNode * base_node,DbTableCATree * tb)836 void runlock_base_node(DbTableCATreeNode *base_node, DbTableCATree* tb)
837 {
838     ASSERT(base_node->is_base_node);
839     erts_rwmtx_runlock(&base_node->u.base.lock);
840     do_random_join_with_low_probability(tb, (Uint)base_node);
841 }
842 
843 static ERTS_INLINE
runlock_base_node_no_rand(DbTableCATreeNode * base_node)844 void runlock_base_node_no_rand(DbTableCATreeNode *base_node)
845 {
846     ASSERT(base_node->is_base_node);
847     erts_rwmtx_runlock(&base_node->u.base.lock);
848 }
849 
850 static ERTS_INLINE
lock_route_node(DbTableCATreeNode * route_node)851 void lock_route_node(DbTableCATreeNode *route_node)
852 {
853     ASSERT(!route_node->is_base_node);
854     erts_mtx_lock(&route_node->u.route.lock);
855 }
856 
857 static ERTS_INLINE
unlock_route_node(DbTableCATreeNode * route_node)858 void unlock_route_node(DbTableCATreeNode *route_node)
859 {
860     ASSERT(!route_node->is_base_node);
861     erts_mtx_unlock(&route_node->u.route.lock);
862 }
863 
864 static ERTS_INLINE
copy_route_key(DbRouteKey * dst,Eterm key,Uint key_size)865 Eterm copy_route_key(DbRouteKey* dst, Eterm key, Uint key_size)
866 {
867     dst->size = key_size;
868     if (key_size != 0) {
869         Eterm* hp = &dst->heap[0];
870         ErlOffHeap tmp_offheap;
871         tmp_offheap.first  = NULL;
872         dst->term = copy_struct(key, key_size, &hp, &tmp_offheap);
873         dst->oh = tmp_offheap.first;
874     }
875     else {
876         ASSERT(is_immed(key));
877         dst->term = key;
878         dst->oh = NULL;
879     }
880     return dst->term;
881 }
882 
883 static ERTS_INLINE
destroy_route_key(DbRouteKey * key)884 void destroy_route_key(DbRouteKey* key)
885 {
886     if (key->oh) {
887         ErlOffHeap oh;
888         oh.first = key->oh;
889         erts_cleanup_offheap(&oh);
890     }
891 }
892 
893 static ERTS_INLINE
init_root_iterator(DbTableCATree * tb,CATreeRootIterator * iter,int read_only)894 void init_root_iterator(DbTableCATree* tb, CATreeRootIterator* iter,
895                         int read_only)
896 {
897     iter->tb = tb;
898     iter->read_only = read_only;
899     iter->locked_bnode = NULL;
900     iter->next_route_key = THE_NON_VALUE;
901     iter->search_key = NULL;
902 }
903 
904 static ERTS_INLINE
lock_iter_base_node(CATreeRootIterator * iter,DbTableCATreeNode * base_node,DbTableCATreeNode * parent,int current_level)905 void lock_iter_base_node(CATreeRootIterator* iter,
906                          DbTableCATreeNode *base_node,
907                          DbTableCATreeNode *parent,
908                          int current_level)
909 {
910     ASSERT(!iter->locked_bnode);
911     if (iter->read_only)
912         rlock_base_node(base_node);
913     else {
914         wlock_base_node(base_node);
915         iter->bnode_parent = parent;
916         iter->bnode_level = current_level;
917     }
918     iter->locked_bnode = base_node;
919 }
920 
921 static ERTS_INLINE
unlock_iter_base_node(CATreeRootIterator * iter)922 void unlock_iter_base_node(CATreeRootIterator* iter)
923 {
924     ASSERT(iter->locked_bnode);
925     if (iter->read_only)
926         runlock_base_node(iter->locked_bnode, iter->tb);
927     else if (iter->locked_bnode->u.base.is_valid) {
928         wunlock_adapt_base_node(iter->tb, iter->locked_bnode,
929                                 iter->bnode_parent, iter->bnode_level);
930     }
931     else
932         wunlock_base_node(iter->locked_bnode);
933     iter->locked_bnode = NULL;
934 }
935 
936 static ERTS_INLINE
destroy_root_iterator(CATreeRootIterator * iter)937 void destroy_root_iterator(CATreeRootIterator* iter)
938 {
939     if (iter->locked_bnode)
940         unlock_iter_base_node(iter);
941     if (iter->search_key) {
942         destroy_route_key(iter->search_key);
943         erts_free(ERTS_ALC_T_DB_TMP, iter->search_key);
944     }
945 }
946 
947 typedef struct
948 {
949     DbTableCATreeNode *parent;
950     int current_level;
951 } FindBaseNode;
952 
953 static ERTS_INLINE
find_base_node(DbTableCATree * tb,Eterm key,FindBaseNode * fbn)954 DbTableCATreeNode* find_base_node(DbTableCATree* tb, Eterm key,
955                                   FindBaseNode* fbn)
956 {
957     DbTableCATreeNode* ERTS_RESTRICT node = GET_ROOT_ACQB(tb);
958     if (fbn) {
959         fbn->parent = NULL;
960         fbn->current_level = 0;
961     }
962     while (!node->is_base_node) {
963         if (fbn) {
964             fbn->current_level++;
965             fbn->parent = node;
966         }
967         if (cmp_key_route(key, node) < 0) {
968             node = GET_LEFT_ACQB(node);
969         } else {
970             node = GET_RIGHT_ACQB(node);
971         }
972     }
973     return node;
974 }
975 
976 static ERTS_INLINE
find_rlock_valid_base_node(DbTableCATree * tb,Eterm key)977 DbTableCATreeNode* find_rlock_valid_base_node(DbTableCATree* tb, Eterm key)
978 {
979     DbTableCATreeNode* base_node;
980 
981     while (1) {
982         base_node = find_base_node(tb, key, NULL);
983         rlock_base_node(base_node);
984         if (base_node->u.base.is_valid)
985             break;
986         runlock_base_node_no_rand(base_node);
987     }
988     return base_node;
989 }
990 
991 static ERTS_INLINE
find_wlock_valid_base_node(DbTableCATree * tb,Eterm key,FindBaseNode * fbn)992 DbTableCATreeNode* find_wlock_valid_base_node(DbTableCATree* tb, Eterm key,
993                                               FindBaseNode* fbn)
994 {
995     DbTableCATreeNode* base_node;
996 
997     while (1) {
998         base_node = find_base_node(tb, key, fbn);
999         wlock_base_node(base_node);
1000         if (base_node->u.base.is_valid)
1001             break;
1002         wunlock_base_node(base_node);
1003     }
1004     return base_node;
1005 }
1006 
1007 #ifdef ERTS_ENABLE_LOCK_CHECK
1008 #  define LC_ORDER(ORDER) ORDER
1009 #else
1010 #  define LC_ORDER(ORDER) NIL
1011 #endif
1012 
1013 #define sizeof_base_node() \
1014           offsetof(DbTableCATreeNode, u.base.end_of_struct__)
1015 
create_base_node(DbTableCATree * tb,TreeDbTerm * root)1016 static DbTableCATreeNode *create_base_node(DbTableCATree *tb,
1017                                            TreeDbTerm* root)
1018 {
1019     DbTableCATreeNode *p;
1020     erts_rwmtx_opt_t rwmtx_opt = ERTS_RWMTX_OPT_DEFAULT_INITER;
1021     p = erts_db_alloc(ERTS_ALC_T_DB_TABLE, (DbTable *) tb,
1022                       sizeof_base_node());
1023 
1024     p->is_base_node = 1;
1025     p->u.base.root = root;
1026     if (tb->common.type & DB_FREQ_READ)
1027         rwmtx_opt.type = ERTS_RWMTX_TYPE_FREQUENT_READ;
1028     if (erts_ets_rwmtx_spin_count >= 0)
1029         rwmtx_opt.main_spincount = erts_ets_rwmtx_spin_count;
1030 
1031     erts_rwmtx_init_opt(&p->u.base.lock, &rwmtx_opt,
1032                         "erl_db_catree_base_node",
1033                         NIL,
1034                         ERTS_LOCK_FLAGS_CATEGORY_DB);
1035     BASE_NODE_STAT_SET(p, ((tb->common.status & DB_CATREE_FORCE_SPLIT)
1036                            ? INT_MAX : 0));
1037     p->u.base.is_valid = 1;
1038     return p;
1039 }
1040 
sizeof_route_node(Uint key_size)1041 static ERTS_INLINE Uint sizeof_route_node(Uint key_size)
1042 {
1043     return (offsetof(DbTableCATreeNode, u.route.key.heap)
1044             + key_size*sizeof(Eterm));
1045 }
1046 
1047 static DbTableCATreeNode*
create_route_node(DbTableCATree * tb,DbTableCATreeNode * left,DbTableCATreeNode * right,DbTerm * keyTerm,DbTableCATreeNode * lc_parent)1048 create_route_node(DbTableCATree *tb,
1049                   DbTableCATreeNode *left,
1050                   DbTableCATreeNode *right,
1051                   DbTerm * keyTerm,
1052                   DbTableCATreeNode* lc_parent)
1053 {
1054     Eterm key = GETKEY(tb,keyTerm->tpl);
1055     int key_size = size_object(key);
1056     DbTableCATreeNode* p = erts_db_alloc(ERTS_ALC_T_DB_TABLE,
1057                                          (DbTable *) tb,
1058                                          sizeof_route_node(key_size));
1059 
1060     copy_route_key(&p->u.route.key, key, key_size);
1061     p->is_base_node = 0;
1062     p->u.route.is_valid = 1;
1063     erts_atomic_init_nob(&p->u.route.left, (erts_aint_t)left);
1064     erts_atomic_init_nob(&p->u.route.right, (erts_aint_t)right);
1065 #ifdef ERTS_ENABLE_LOCK_CHECK
1066     /* Route node lock order is inverse tree depth (from leafs toward root) */
1067     p->u.route.lc_order = (lc_parent == NULL ? MAX_SMALL :
1068                            lc_parent->u.route.lc_order - 1);
1069     /*
1070      * This assert may eventually fail as we don't increase 'lc_order' in join
1071      * operations when route nodes move up in the tree.
1072      * Tough luck if you run a lock-checking VM for such a long time on 32-bit.
1073      */
1074     ERTS_LC_ASSERT(p->u.route.lc_order >= 0);
1075 #endif
1076     erts_mtx_init(&p->u.route.lock, "erl_db_catree_route_node",
1077                   LC_ORDER(make_small(p->u.route.lc_order)),
1078                   ERTS_LOCK_FLAGS_CATEGORY_DB);
1079     return p;
1080 }
1081 
do_free_base_node(void * vptr)1082 static void do_free_base_node(void* vptr)
1083 {
1084     DbTableCATreeNode *p = (DbTableCATreeNode *)vptr;
1085     ASSERT(p->is_base_node);
1086     erts_rwmtx_destroy(&p->u.base.lock);
1087     erts_free(ERTS_ALC_T_DB_TABLE, p);
1088 }
1089 
free_catree_base_node(DbTableCATree * tb,DbTableCATreeNode * p)1090 static void free_catree_base_node(DbTableCATree* tb, DbTableCATreeNode* p)
1091 {
1092     ASSERT(p->is_base_node);
1093     ERTS_DB_ALC_MEM_UPDATE_(tb, sizeof_base_node(), 0);
1094     do_free_base_node(p);
1095 }
1096 
do_free_route_node(void * vptr)1097 static void do_free_route_node(void *vptr)
1098 {
1099     DbTableCATreeNode *p = (DbTableCATreeNode *)vptr;
1100     ASSERT(!p->is_base_node);
1101     erts_mtx_destroy(&p->u.route.lock);
1102     destroy_route_key(&p->u.route.key);
1103     erts_free(ERTS_ALC_T_DB_TABLE, p);
1104 }
1105 
free_catree_route_node(DbTableCATree * tb,DbTableCATreeNode * p)1106 static void free_catree_route_node(DbTableCATree* tb, DbTableCATreeNode* p)
1107 {
1108     ASSERT(!p->is_base_node);
1109     ERTS_DB_ALC_MEM_UPDATE_(tb, sizeof_route_node(p->u.route.key.size), 0);
1110     do_free_route_node(p);
1111 }
1112 
1113 
1114 /*
1115  * Returns the parent routing node of the specified
1116  * route node 'child' if such a parent exists
1117  * or NULL if 'child' is attached to the root.
1118  */
1119 static ERTS_INLINE DbTableCATreeNode *
parent_of(DbTableCATree * tb,DbTableCATreeNode * child)1120 parent_of(DbTableCATree *tb,
1121           DbTableCATreeNode *child)
1122 {
1123     Eterm key = GET_ROUTE_NODE_KEY(child);
1124     DbTableCATreeNode *current = GET_ROOT_ACQB(tb);
1125     DbTableCATreeNode *prev = NULL;
1126 
1127     while (current != child) {
1128         prev = current;
1129         if (cmp_key_route(key, current) < 0) {
1130             current = GET_LEFT_ACQB(current);
1131         } else {
1132             current = GET_RIGHT_ACQB(current);
1133         }
1134     }
1135     return prev;
1136 }
1137 
1138 
1139 static ERTS_INLINE DbTableCATreeNode *
leftmost_base_node(DbTableCATreeNode * root)1140 leftmost_base_node(DbTableCATreeNode *root)
1141 {
1142     DbTableCATreeNode *node = root;
1143     while (!node->is_base_node) {
1144         node = GET_LEFT_ACQB(node);
1145     }
1146     return node;
1147 }
1148 
1149 
1150 static ERTS_INLINE DbTableCATreeNode *
rightmost_base_node(DbTableCATreeNode * root)1151 rightmost_base_node(DbTableCATreeNode *root)
1152 {
1153     DbTableCATreeNode *node = root;
1154     while (!node->is_base_node) {
1155         node = GET_RIGHT_ACQB(node);
1156     }
1157     return node;
1158 }
1159 
1160 
1161 static ERTS_INLINE DbTableCATreeNode *
leftmost_route_node(DbTableCATreeNode * root)1162 leftmost_route_node(DbTableCATreeNode *root)
1163 {
1164     DbTableCATreeNode *node = root;
1165     DbTableCATreeNode *prev_node = NULL;
1166     while (!node->is_base_node) {
1167         prev_node = node;
1168         node = GET_LEFT_ACQB(node);
1169     }
1170     return prev_node;
1171 }
1172 
1173 static ERTS_INLINE DbTableCATreeNode*
rightmost_route_node(DbTableCATreeNode * root)1174 rightmost_route_node(DbTableCATreeNode *root)
1175 {
1176     DbTableCATreeNode * node = root;
1177     DbTableCATreeNode * prev_node = NULL;
1178     while (!node->is_base_node) {
1179         prev_node = node;
1180         node = GET_RIGHT_ACQB(node);
1181     }
1182     return prev_node;
1183 }
1184 
1185 static ERTS_INLINE
init_tree_stack(DbTreeStack * stack,TreeDbTerm ** stack_array,Uint init_slot)1186 void init_tree_stack(DbTreeStack *stack,
1187                      TreeDbTerm **stack_array,
1188                      Uint init_slot)
1189 {
1190     stack->array = stack_array;
1191     stack->pos = 0;
1192     stack->slot = init_slot;
1193 }
1194 
join_catree(DbTableCATree * tb,DbTableCATreeNode * thiz,DbTableCATreeNode * parent)1195 static void join_catree(DbTableCATree *tb,
1196                         DbTableCATreeNode *thiz,
1197                         DbTableCATreeNode *parent)
1198 {
1199     DbTableCATreeNode *gparent;
1200     DbTableCATreeNode *neighbor;
1201     DbTableCATreeNode *new_neighbor;
1202     DbTableCATreeNode *neighbor_parent;
1203 
1204     ASSERT(thiz->is_base_node);
1205     if (parent == NULL) {
1206         BASE_NODE_STAT_SET(thiz, 0);
1207         wunlock_base_node(thiz);
1208         return;
1209     }
1210     ASSERT(!parent->is_base_node);
1211     if (GET_LEFT(parent) == thiz) {
1212         neighbor = leftmost_base_node(GET_RIGHT_ACQB(parent));
1213         if (try_wlock_base_node(&neighbor->u.base)) {
1214             /* Failed to acquire lock */
1215             BASE_NODE_STAT_SET(thiz, 0);
1216             wunlock_base_node(thiz);
1217             return;
1218         } else if (!neighbor->u.base.is_valid) {
1219             BASE_NODE_STAT_SET(thiz, 0);
1220             wunlock_base_node(thiz);
1221             wunlock_base_node(neighbor);
1222             return;
1223         } else {
1224             lock_route_node(parent);
1225             parent->u.route.is_valid = 0;
1226             neighbor->u.base.is_valid = 0;
1227             thiz->u.base.is_valid = 0;
1228             gparent = NULL;
1229             do {
1230                 if (gparent != NULL) {
1231                     unlock_route_node(gparent);
1232                 }
1233                 gparent = parent_of(tb, parent);
1234                 if (gparent != NULL)
1235                     lock_route_node(gparent);
1236             } while (gparent != NULL && !gparent->u.route.is_valid);
1237 
1238             if (gparent == NULL) {
1239                 SET_ROOT_RELB(tb, GET_RIGHT(parent));
1240             } else if (GET_LEFT(gparent) == parent) {
1241                 SET_LEFT_RELB(gparent, GET_RIGHT(parent));
1242             } else {
1243                 SET_RIGHT_RELB(gparent, GET_RIGHT(parent));
1244             }
1245             unlock_route_node(parent);
1246             if (gparent != NULL) {
1247                 unlock_route_node(gparent);
1248             }
1249             {
1250                 TreeDbTerm* new_root = join_trees(thiz->u.base.root,
1251                                                   neighbor->u.base.root);
1252                 new_neighbor = create_base_node(tb, new_root);
1253             }
1254             if (GET_RIGHT(parent) == neighbor) {
1255                 neighbor_parent = gparent;
1256             } else {
1257                 neighbor_parent = leftmost_route_node(GET_RIGHT(parent));
1258             }
1259         }
1260     } else { /* Symetric case */
1261         ASSERT(GET_RIGHT(parent) == thiz);
1262         neighbor = rightmost_base_node(GET_LEFT_ACQB(parent));
1263         if (try_wlock_base_node(&neighbor->u.base)) {
1264             /* Failed to acquire lock */
1265             BASE_NODE_STAT_SET(thiz, 0);
1266             wunlock_base_node(thiz);
1267             return;
1268         } else if (!neighbor->u.base.is_valid) {
1269             BASE_NODE_STAT_SET(thiz, 0);
1270             wunlock_base_node(thiz);
1271             wunlock_base_node(neighbor);
1272             return;
1273         } else {
1274             lock_route_node(parent);
1275             parent->u.route.is_valid = 0;
1276             neighbor->u.base.is_valid = 0;
1277             thiz->u.base.is_valid = 0;
1278             gparent = NULL;
1279             do {
1280                 if (gparent != NULL) {
1281                     unlock_route_node(gparent);
1282                 }
1283                 gparent = parent_of(tb, parent);
1284                 if (gparent != NULL) {
1285                     lock_route_node(gparent);
1286                 } else {
1287                     gparent = NULL;
1288                 }
1289             } while (gparent != NULL && !gparent->u.route.is_valid);
1290             if (gparent == NULL) {
1291                 SET_ROOT_RELB(tb, GET_LEFT(parent));
1292             } else if (GET_RIGHT(gparent) == parent) {
1293                 SET_RIGHT_RELB(gparent, GET_LEFT(parent));
1294             } else {
1295                 SET_LEFT_RELB(gparent, GET_LEFT(parent));
1296             }
1297             unlock_route_node(parent);
1298             if (gparent != NULL) {
1299                 unlock_route_node(gparent);
1300             }
1301             {
1302                 TreeDbTerm* new_root = join_trees(neighbor->u.base.root,
1303                                                   thiz->u.base.root);
1304                 new_neighbor = create_base_node(tb, new_root);
1305             }
1306             if (GET_LEFT(parent) == neighbor) {
1307                 neighbor_parent = gparent;
1308             } else {
1309                 neighbor_parent =
1310                     rightmost_route_node(GET_LEFT(parent));
1311             }
1312         }
1313     }
1314     /* Link in new neighbor and free nodes that are no longer in the tree */
1315     if (neighbor_parent == NULL) {
1316         SET_ROOT_RELB(tb, new_neighbor);
1317     } else if (GET_LEFT(neighbor_parent) == neighbor) {
1318         SET_LEFT_RELB(neighbor_parent, new_neighbor);
1319     } else {
1320         SET_RIGHT_RELB(neighbor_parent, new_neighbor);
1321     }
1322     wunlock_base_node(thiz);
1323     wunlock_base_node(neighbor);
1324     /* Free the parent and base */
1325     erts_schedule_db_free(&tb->common,
1326                           do_free_route_node,
1327                           parent,
1328                           &parent->u.route.free_item,
1329                           sizeof_route_node(parent->u.route.key.size));
1330     erts_schedule_db_free(&tb->common,
1331                           do_free_base_node,
1332                           thiz,
1333                           &thiz->u.base.free_item,
1334                           sizeof_base_node());
1335     erts_schedule_db_free(&tb->common,
1336                           do_free_base_node,
1337                           neighbor,
1338                           &neighbor->u.base.free_item,
1339                           sizeof_base_node());
1340 }
1341 
split_catree(DbTableCATree * tb,DbTableCATreeNode * ERTS_RESTRICT base,DbTableCATreeNode * ERTS_RESTRICT parent)1342 static void split_catree(DbTableCATree *tb,
1343                          DbTableCATreeNode* ERTS_RESTRICT base,
1344                          DbTableCATreeNode* ERTS_RESTRICT parent)
1345 {
1346     TreeDbTerm *splitOutWriteBack;
1347     DbTableCATreeNode* ERTS_RESTRICT new_left;
1348     DbTableCATreeNode* ERTS_RESTRICT new_right;
1349     DbTableCATreeNode* ERTS_RESTRICT new_route;
1350 
1351     if (less_than_two_elements(base->u.base.root)) {
1352         if (!(tb->common.status & DB_CATREE_FORCE_SPLIT))
1353             BASE_NODE_STAT_SET(base, 0);
1354         wunlock_base_node(base);
1355         return;
1356     } else {
1357         TreeDbTerm *left_tree;
1358         TreeDbTerm *right_tree;
1359 
1360         split_tree(tb, base->u.base.root, &splitOutWriteBack,
1361                    &left_tree, &right_tree);
1362 
1363         new_left = create_base_node(tb, left_tree);
1364         new_right = create_base_node(tb, right_tree);
1365         new_route = create_route_node(tb,
1366                                       new_left,
1367                                       new_right,
1368                                       &splitOutWriteBack->dbterm,
1369                                       parent);
1370         if (parent == NULL) {
1371             SET_ROOT_RELB(tb, new_route);
1372         } else if(GET_LEFT(parent) == base) {
1373             SET_LEFT_RELB(parent, new_route);
1374         } else {
1375             SET_RIGHT_RELB(parent, new_route);
1376         }
1377         base->u.base.is_valid = 0;
1378         wunlock_base_node(base);
1379         erts_schedule_db_free(&tb->common,
1380                               do_free_base_node,
1381                               base,
1382                               &base->u.base.free_item,
1383                               sizeof_base_node());
1384     }
1385 }
1386 
1387 /** @brief Free the entire catree and its sub-trees.
1388  *
1389  * @param reds Reductions to spend.
1390  * @return Reductions left. Negative value if not done.
1391  */
db_free_table_continue_catree(DbTable * tbl,SWord reds)1392 static SWord db_free_table_continue_catree(DbTable *tbl, SWord reds)
1393 {
1394     DbTableCATree *tb = &tbl->catree;
1395     DbTableCATreeNode *node;
1396     DbTableCATreeNode *parent;
1397     CATreeNodeStack rnode_stack;
1398     DbTableCATreeNode *rnode_stack_array[STACK_NEED];
1399 
1400     if (!tb->deletion) {
1401         /* First call */
1402         tb->deletion = 1;
1403         tb->nr_of_deleted_items = 0;
1404     }
1405 
1406     /*
1407      * The route tree is traversed and freed while keeping it consistent
1408      * during yields.
1409      */
1410     rnode_stack.array = rnode_stack_array;
1411     rnode_stack.pos = 0;
1412     rnode_stack.size = STACK_NEED;
1413 
1414     node = GET_ROOT(tb);
1415     if (node->is_base_node) {
1416         if (node->u.base.root) {
1417             reds = do_delete_base_node_cont(tb, node, reds);
1418             if (reds < 0)
1419                 return reds; /* Yield */
1420         }
1421         free_catree_base_node(tb, node);
1422     }
1423     else {
1424         for (;;) {
1425             DbTableCATreeNode* left = GET_LEFT(node);
1426             DbTableCATreeNode* right = GET_RIGHT(node);
1427 
1428             if (!left->is_base_node) {
1429                 PUSH_NODE(&rnode_stack, node);
1430                 node = left;
1431             }
1432             else if (!right->is_base_node) {
1433                 PUSH_NODE(&rnode_stack, node);
1434                 node = right;
1435             }
1436             else {
1437                 if (left->u.base.root) {
1438                     reds = do_delete_base_node_cont(tb, left, reds);
1439                     if (reds < 0)
1440                         return reds; /* Yield */
1441                 }
1442                 if (right->u.base.root) {
1443                     reds = do_delete_base_node_cont(tb, right, reds);
1444                     if (reds < 0)
1445                         return reds; /* Yield */
1446                 }
1447 
1448                 free_catree_base_node(tb, right);
1449                 free_catree_route_node(tb, node);
1450                 /*
1451                  * Keep empty left base node to join with its grandparent
1452                  * for tree consistency during yields.
1453                  */
1454 
1455                 parent = POP_NODE(&rnode_stack);
1456                 if (parent) {
1457                     if (node == GET_LEFT(parent)) {
1458                         SET_LEFT(parent, left);
1459                     }
1460                     else {
1461                         ASSERT(node == GET_RIGHT(parent));
1462                         SET_RIGHT(parent, left);
1463                     }
1464 
1465                     reds -= 2;
1466                     if (reds < 0)
1467                         return reds; /* Yield */
1468 
1469                     node = parent;
1470                 }
1471                 else {  /* Done */
1472                     free_catree_base_node(tb, left);
1473                     break;
1474                 }
1475             }
1476         }
1477     }
1478 
1479     ASSERT(reds >= 0);
1480     SET_ROOT(tb, NULL);
1481     return reds;
1482 }
1483 
1484 /** @brief Free all objects of a base node, but keep the base node.
1485  *
1486  * @param reds Reductions to spend.
1487  * @return Reductions left. Negative value if not done.
1488  */
do_delete_base_node_cont(DbTableCATree * tb,DbTableCATreeNode * base_node,SWord reds)1489 static SWord do_delete_base_node_cont(DbTableCATree *tb,
1490                                       DbTableCATreeNode *base_node,
1491                                       SWord reds)
1492 {
1493     TreeDbTerm *p;
1494     DbTreeStack stack;
1495     TreeDbTerm* stack_array[STACK_NEED];
1496 
1497     stack.pos = 0;
1498     stack.array = stack_array;
1499 
1500     p = base_node->u.base.root;
1501     for (;;) {
1502         if (p->left) {
1503             PUSH_NODE(&stack, p);
1504             p = p->left;
1505         }
1506         else if (p->right) {
1507             PUSH_NODE(&stack, p);
1508             p = p->right;
1509         }
1510         else {
1511             TreeDbTerm *parent;
1512 
1513             DEC_NITEMS((DbTable*)tb);
1514             tb->nr_of_deleted_items++;
1515             free_term((DbTable*)tb, p);
1516 
1517             parent = POP_NODE(&stack);
1518             if (!parent)
1519                 break;
1520             if (parent->left == p)
1521                 parent->left = NULL;
1522             else {
1523                 ASSERT(parent->right == p);
1524                 parent->right = NULL;
1525             }
1526             if (--reds < 0)
1527                 return reds;	/* Yield */
1528             p = parent;
1529         }
1530     }
1531     base_node->u.base.root = NULL;
1532     return reds;
1533 }
1534 
1535 
1536 /*
1537 ** Initialization function
1538 */
1539 
db_initialize_catree(void)1540 void db_initialize_catree(void)
1541 {
1542     return;
1543 };
1544 
1545 /*
1546 ** Table interface routines (i.e., what's called by the bif's)
1547 */
1548 
db_create_catree(Process * p,DbTable * tbl)1549 int db_create_catree(Process *p, DbTable *tbl)
1550 {
1551     DbTableCATree *tb = &tbl->catree;
1552     DbTableCATreeNode *root;
1553 
1554     root = create_base_node(tb, NULL);
1555     tb->deletion = 0;
1556     tb->nr_of_deleted_items = 0;
1557 #ifdef DEBUG
1558     tbl->common.status |= DB_CATREE_DEBUG_RANDOM_SPLIT_JOIN;
1559 #endif
1560     erts_atomic_init_relb(&(tb->root), (erts_aint_t)root);
1561     return DB_ERROR_NONE;
1562 }
1563 
db_first_catree(Process * p,DbTable * tbl,Eterm * ret)1564 static int db_first_catree(Process *p, DbTable *tbl, Eterm *ret)
1565 {
1566     TreeDbTerm *root;
1567     CATreeRootIterator iter;
1568     int result;
1569 
1570     init_root_iterator(&tbl->catree, &iter, 1);
1571     root = *catree_find_first_root(&iter);
1572     if (!root) {
1573         TreeDbTerm **pp = catree_find_next_root(&iter, NULL);
1574         root = pp ? *pp : NULL;
1575     }
1576 
1577     result = db_first_tree_common(p, tbl, root, ret, NULL);
1578 
1579     destroy_root_iterator(&iter);
1580     return result;
1581 }
1582 
db_next_catree(Process * p,DbTable * tbl,Eterm key,Eterm * ret)1583 static int db_next_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
1584 {
1585     DbTreeStack stack;
1586     TreeDbTerm * stack_array[STACK_NEED];
1587     TreeDbTerm **rootp;
1588     CATreeRootIterator iter;
1589     int result;
1590 
1591     init_root_iterator(&tbl->catree, &iter, 1);
1592     iter.next_route_key = key;
1593     rootp = catree_find_next_root(&iter, NULL);
1594 
1595     do {
1596         init_tree_stack(&stack, stack_array, 0);
1597         result = db_next_tree_common(p, tbl, (rootp ? *rootp : NULL), key, ret, &stack);
1598         if (result != DB_ERROR_NONE || *ret != am_EOT)
1599             break;
1600 
1601         rootp = catree_find_next_root(&iter, NULL);
1602     } while (rootp);
1603 
1604     destroy_root_iterator(&iter);
1605     return result;
1606 }
1607 
db_last_catree(Process * p,DbTable * tbl,Eterm * ret)1608 static int db_last_catree(Process *p, DbTable *tbl, Eterm *ret)
1609 {
1610     TreeDbTerm *root;
1611     CATreeRootIterator iter;
1612     int result;
1613 
1614     init_root_iterator(&tbl->catree, &iter, 1);
1615     root = *catree_find_last_root(&iter);
1616     if (!root) {
1617         TreeDbTerm **pp = catree_find_prev_root(&iter, NULL);
1618         root = pp ? *pp : NULL;
1619     }
1620 
1621     result = db_last_tree_common(p, tbl, root, ret, NULL);
1622 
1623     destroy_root_iterator(&iter);
1624     return result;
1625 }
1626 
db_prev_catree(Process * p,DbTable * tbl,Eterm key,Eterm * ret)1627 static int db_prev_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
1628 {
1629     DbTreeStack stack;
1630     TreeDbTerm * stack_array[STACK_NEED];
1631     TreeDbTerm **rootp;
1632     CATreeRootIterator iter;
1633     int result;
1634 
1635     init_root_iterator(&tbl->catree, &iter, 1);
1636     iter.next_route_key = key;
1637     rootp = catree_find_prev_root(&iter, NULL);
1638 
1639     do {
1640         init_tree_stack(&stack, stack_array, 0);
1641         result = db_prev_tree_common(p, tbl, (rootp ? *rootp : NULL), key, ret,
1642                                      &stack);
1643         if (result != DB_ERROR_NONE || *ret != am_EOT)
1644             break;
1645         rootp = catree_find_prev_root(&iter, NULL);
1646     } while (rootp);
1647 
1648     destroy_root_iterator(&iter);
1649     return result;
1650 }
1651 
db_put_dbterm_catree(DbTable * tbl,void * obj,int key_clash_fail,SWord * consumed_reds_p)1652 static int db_put_dbterm_catree(DbTable* tbl,
1653                                 void* obj,
1654                                 int key_clash_fail,
1655                                 SWord *consumed_reds_p)
1656 {
1657     TreeDbTerm *value_to_insert = obj;
1658     DbTableCATree *tb = &tbl->catree;
1659     Eterm key = GETKEY(tb, value_to_insert->dbterm.tpl);
1660     FindBaseNode fbn;
1661     DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn);
1662     int result = db_put_dbterm_tree_common(&tb->common,
1663                                            &node->u.base.root,
1664                                            value_to_insert,
1665                                            key_clash_fail,
1666                                            NULL);
1667     wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level);
1668     return result;
1669 }
1670 
db_put_catree(DbTable * tbl,Eterm obj,int key_clash_fail,SWord * consumed_reds_p)1671 static int db_put_catree(DbTable *tbl, Eterm obj, int key_clash_fail,
1672                          SWord *consumed_reds_p)
1673 {
1674     DbTableCATree *tb = &tbl->catree;
1675     Eterm key = GETKEY(&tb->common, tuple_val(obj));
1676     FindBaseNode fbn;
1677     DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn);
1678     int result = db_put_tree_common(&tb->common, &node->u.base.root, obj,
1679                                     key_clash_fail, NULL);
1680     wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level);
1681     return result;
1682 }
1683 
db_get_catree(Process * p,DbTable * tbl,Eterm key,Eterm * ret)1684 static int db_get_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
1685 {
1686     DbTableCATree *tb = &tbl->catree;
1687     DbTableCATreeNode* node = find_rlock_valid_base_node(tb, key);
1688     int result = db_get_tree_common(p, &tb->common,
1689                                     node->u.base.root,
1690                                     key, ret, NULL);
1691     runlock_base_node(node, tb);
1692     return result;
1693 }
1694 
catree_find_root(Eterm key,CATreeRootIterator * iter)1695 TreeDbTerm** catree_find_root(Eterm key, CATreeRootIterator* iter)
1696 {
1697     FindBaseNode fbn;
1698     DbTableCATreeNode* base_node;
1699 
1700     while (1) {
1701         base_node = find_base_node(iter->tb, key, &fbn);
1702         lock_iter_base_node(iter, base_node, fbn.parent, fbn.current_level);
1703         if (base_node->u.base.is_valid)
1704             break;
1705         unlock_iter_base_node(iter);
1706     }
1707     return &base_node->u.base.root;
1708 }
1709 
save_iter_search_key(CATreeRootIterator * iter,Eterm key)1710 static Eterm save_iter_search_key(CATreeRootIterator* iter, Eterm key)
1711 {
1712     Uint key_size;
1713 
1714     if (is_immed(key))
1715         return key;
1716 
1717     if (iter->search_key) {
1718         if (key == iter->search_key->term)
1719             return key; /* already saved */
1720         destroy_route_key(iter->search_key);
1721     }
1722     key_size = size_object(key);
1723     if (!iter->search_key || key_size > iter->search_key->size) {
1724         iter->search_key = erts_realloc(ERTS_ALC_T_DB_TMP,
1725                                         iter->search_key,
1726                                         (offsetof(DbRouteKey, heap)
1727                                          + key_size*sizeof(Eterm)));
1728     }
1729     return copy_route_key(iter->search_key, key, key_size);
1730 }
1731 
catree_find_nextprev_root(CATreeRootIterator * iter,int forward,Eterm * search_keyp)1732 TreeDbTerm** catree_find_nextprev_root(CATreeRootIterator *iter,
1733                                        int forward,
1734                                        Eterm *search_keyp)
1735 {
1736 #ifdef DEBUG
1737     DbTableCATreeNode *rejected_invalid = NULL;
1738     DbTableCATreeNode *rejected_empty = NULL;
1739 #endif
1740     DbTableCATreeNode *node;
1741     DbTableCATreeNode *parent;
1742     DbTableCATreeNode* next_route_node;
1743     Eterm route_key = iter->next_route_key;
1744     int current_level;
1745 
1746     if (iter->locked_bnode) {
1747         if (search_keyp)
1748             *search_keyp = save_iter_search_key(iter, *search_keyp);
1749         unlock_iter_base_node(iter);
1750     }
1751 
1752     if (is_non_value(route_key))
1753         return NULL;
1754 
1755     while (1) {
1756         node = GET_ROOT_ACQB(iter->tb);
1757         current_level = 0;
1758         parent = NULL;
1759         next_route_node = NULL;
1760         while (!node->is_base_node) {
1761             current_level++;
1762             parent = node;
1763             if (forward) {
1764                 if (cmp_key_route(route_key,node) < 0) {
1765                     next_route_node = node;
1766                     node = GET_LEFT_ACQB(node);
1767                 } else {
1768                     node = GET_RIGHT_ACQB(node);
1769                 }
1770             }
1771             else {
1772                 if (cmp_key_route(route_key,node) > 0) {
1773                     next_route_node = node;
1774                     node = GET_RIGHT_ACQB(node);
1775                 } else {
1776                     node = GET_LEFT_ACQB(node);
1777                 }
1778             }
1779         }
1780         ASSERT(node != rejected_invalid);
1781         lock_iter_base_node(iter, node, parent, current_level);
1782         if (node->u.base.is_valid) {
1783             ASSERT(node != rejected_empty);
1784             if (node->u.base.root) {
1785                 iter->next_route_key = (next_route_node ?
1786                                         next_route_node->u.route.key.term :
1787                                         THE_NON_VALUE);
1788                 iter->locked_bnode = node;
1789                 return &node->u.base.root;
1790             }
1791             if (!next_route_node) {
1792                 unlock_iter_base_node(iter);
1793                 return NULL;
1794             }
1795             route_key = next_route_node->u.route.key.term;
1796             IF_DEBUG(rejected_empty = node);
1797         }
1798         else
1799             IF_DEBUG(rejected_invalid = node);
1800 
1801         /* Retry */
1802         unlock_iter_base_node(iter);
1803     }
1804 }
1805 
catree_find_next_root(CATreeRootIterator * iter,Eterm * keyp)1806 TreeDbTerm** catree_find_next_root(CATreeRootIterator *iter, Eterm* keyp)
1807 {
1808     return catree_find_nextprev_root(iter, 1, keyp);
1809 }
1810 
catree_find_prev_root(CATreeRootIterator * iter,Eterm * keyp)1811 TreeDbTerm** catree_find_prev_root(CATreeRootIterator *iter, Eterm* keyp)
1812 {
1813     return catree_find_nextprev_root(iter, 0, keyp);
1814 }
1815 
1816 /** @brief Find root of tree where object with smallest key of all larger than
1817  * partially bound key may reside. Can be used as a starting point for
1818  * a reverse iteration with pb_key.
1819  *
1820  * @param pb_key The partially bound key. Example {42, '$1'}
1821  * @param iter An initialized root iterator.
1822  *
1823  * @return Pointer to found root pointer. May not be NULL.
1824  */
catree_find_next_from_pb_key_root(Eterm pb_key,CATreeRootIterator * iter)1825 TreeDbTerm** catree_find_next_from_pb_key_root(Eterm pb_key,
1826                                                CATreeRootIterator* iter)
1827 {
1828 #ifdef DEBUG
1829     DbTableCATreeNode *rejected_base = NULL;
1830 #endif
1831     DbTableCATreeNode *node;
1832     DbTableCATreeNode *parent;
1833     DbTableCATreeNode* next_route_node;
1834     int current_level;
1835 
1836     ASSERT(!iter->locked_bnode);
1837 
1838     while (1) {
1839         node = GET_ROOT_ACQB(iter->tb);
1840         current_level = 0;
1841         parent = NULL;
1842         next_route_node = NULL;
1843         while (!node->is_base_node) {
1844             current_level++;
1845             parent = node;
1846             if (cmp_partly_bound(pb_key, GET_ROUTE_NODE_KEY(node)) >= 0) {
1847                 next_route_node = node;
1848                 node = GET_RIGHT_ACQB(node);
1849             } else {
1850                 node = GET_LEFT_ACQB(node);
1851             }
1852         }
1853         ASSERT(node != rejected_base);
1854         lock_iter_base_node(iter, node, parent, current_level);
1855         if (node->u.base.is_valid) {
1856             iter->next_route_key = (next_route_node ?
1857                                     next_route_node->u.route.key.term :
1858                                     THE_NON_VALUE);
1859             return &node->u.base.root;
1860         }
1861         /* Retry */
1862         unlock_iter_base_node(iter);
1863 #ifdef DEBUG
1864         rejected_base = node;
1865 #endif
1866     }
1867 }
1868 
1869 /** @brief Find root of tree where object with largest key of all smaller than
1870  * partially bound key may reside. Can be used as a starting point for
1871  * a forward iteration with pb_key.
1872  *
1873  * @param pb_key The partially bound key. Example {42, '$1'}
1874  * @param iter An initialized root iterator.
1875  *
1876  * @return Pointer to found root pointer. May not be NULL.
1877  */
catree_find_prev_from_pb_key_root(Eterm key,CATreeRootIterator * iter)1878 TreeDbTerm** catree_find_prev_from_pb_key_root(Eterm key,
1879                                                CATreeRootIterator* iter)
1880 {
1881 #ifdef DEBUG
1882     DbTableCATreeNode *rejected_base = NULL;
1883 #endif
1884     DbTableCATreeNode *node;
1885     DbTableCATreeNode *parent;
1886     DbTableCATreeNode* next_route_node;
1887     int current_level;
1888 
1889     ASSERT(!iter->locked_bnode);
1890 
1891     while (1) {
1892         node = GET_ROOT_ACQB(iter->tb);
1893         current_level = 0;
1894         parent = NULL;
1895         next_route_node = NULL;
1896         while (!node->is_base_node) {
1897             current_level++;
1898             parent = node;
1899             if (cmp_partly_bound(key, GET_ROUTE_NODE_KEY(node)) <= 0) {
1900                 next_route_node = node;
1901                 node = GET_LEFT_ACQB(node);
1902             } else {
1903                 node = GET_RIGHT_ACQB(node);
1904             }
1905         }
1906         ASSERT(node != rejected_base);
1907         lock_iter_base_node(iter, node, parent, current_level);
1908         if (node->u.base.is_valid) {
1909             iter->next_route_key = (next_route_node ?
1910                                     next_route_node->u.route.key.term :
1911                                     THE_NON_VALUE);
1912             return &node->u.base.root;
1913         }
1914         /* Retry */
1915         unlock_iter_base_node(iter);
1916 #ifdef DEBUG
1917         rejected_base = node;
1918 #endif
1919     }
1920 }
1921 
catree_find_firstlast_root(CATreeRootIterator * iter,int first)1922 static TreeDbTerm** catree_find_firstlast_root(CATreeRootIterator* iter,
1923                                                int first)
1924 {
1925 #ifdef DEBUG
1926     DbTableCATreeNode *rejected_base = NULL;
1927 #endif
1928     DbTableCATreeNode *node;
1929     DbTableCATreeNode* next_route_node;
1930     int current_level;
1931 
1932     while (1) {
1933         node = GET_ROOT_ACQB(iter->tb);
1934         current_level = 0;
1935         next_route_node = NULL;
1936         while (!node->is_base_node) {
1937             current_level++;
1938             next_route_node = node;
1939             node = first ? GET_LEFT_ACQB(node) : GET_RIGHT_ACQB(node);
1940         }
1941         ASSERT(node != rejected_base);
1942         lock_iter_base_node(iter, node, next_route_node, current_level);
1943         if (node->u.base.is_valid) {
1944             iter->next_route_key = (next_route_node ?
1945                                     next_route_node->u.route.key.term :
1946                                     THE_NON_VALUE);
1947             return &node->u.base.root;
1948         }
1949         /* Retry */
1950         unlock_iter_base_node(iter);
1951 #ifdef DEBUG
1952         rejected_base = node;
1953 #endif
1954     }
1955 }
1956 
catree_find_first_root(CATreeRootIterator * iter)1957 TreeDbTerm** catree_find_first_root(CATreeRootIterator* iter)
1958 {
1959     return catree_find_firstlast_root(iter, 1);
1960 }
1961 
catree_find_last_root(CATreeRootIterator * iter)1962 TreeDbTerm** catree_find_last_root(CATreeRootIterator* iter)
1963 {
1964     return catree_find_firstlast_root(iter, 0);
1965 }
1966 
db_member_catree(DbTable * tbl,Eterm key,Eterm * ret)1967 static int db_member_catree(DbTable *tbl, Eterm key, Eterm *ret)
1968 {
1969     DbTableCATree *tb = &tbl->catree;
1970     DbTableCATreeNode* node = find_rlock_valid_base_node(tb, key);
1971     int result = db_member_tree_common(&tb->common,
1972                                        node->u.base.root,
1973                                        key, ret, NULL);
1974     runlock_base_node(node, tb);
1975     return result;
1976 }
1977 
db_get_element_catree(Process * p,DbTable * tbl,Eterm key,int ndex,Eterm * ret)1978 static int db_get_element_catree(Process *p, DbTable *tbl,
1979 			       Eterm key, int ndex, Eterm *ret)
1980 {
1981     DbTableCATree *tb = &tbl->catree;
1982     DbTableCATreeNode* node = find_rlock_valid_base_node(tb, key);
1983     int result = db_get_element_tree_common(p, &tb->common,
1984                                             node->u.base.root,
1985                                             key, ndex, ret, NULL);
1986     runlock_base_node(node, tb);
1987     return result;
1988 }
1989 
db_erase_catree(DbTable * tbl,Eterm key,Eterm * ret)1990 static int db_erase_catree(DbTable *tbl, Eterm key, Eterm *ret)
1991 {
1992     DbTableCATree *tb = &tbl->catree;
1993     FindBaseNode fbn;
1994     DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn);
1995     int result = db_erase_tree_common(tbl, &node->u.base.root, key,
1996                                       ret, NULL);
1997     wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level);
1998     return result;
1999 }
2000 
db_erase_object_catree(DbTable * tbl,Eterm object,Eterm * ret)2001 static int db_erase_object_catree(DbTable *tbl, Eterm object, Eterm *ret)
2002 {
2003     DbTableCATree *tb = &tbl->catree;
2004     Eterm key = GETKEY(&tb->common, tuple_val(object));
2005     FindBaseNode fbn;
2006     DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn);
2007     int result = db_erase_object_tree_common(tbl,
2008                                              &node->u.base.root,
2009                                              object,
2010                                              ret,
2011                                              NULL);
2012     wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level);
2013     return result;
2014 }
2015 
2016 
db_slot_catree(Process * p,DbTable * tbl,Eterm slot_term,Eterm * ret)2017 static int db_slot_catree(Process *p, DbTable *tbl,
2018                           Eterm slot_term, Eterm *ret)
2019 {
2020     int result;
2021     CATreeRootIterator iter;
2022 
2023     init_root_iterator(&tbl->catree, &iter, 1);
2024     result = db_slot_tree_common(p, tbl, *catree_find_first_root(&iter),
2025                                  slot_term, ret, NULL, &iter);
2026     destroy_root_iterator(&iter);
2027     return result;
2028 }
2029 
db_select_continue_catree(Process * p,DbTable * tbl,Eterm continuation,Eterm * ret,enum DbIterSafety * safety_p)2030 static int db_select_continue_catree(Process *p,
2031                                      DbTable *tbl,
2032                                      Eterm continuation,
2033                                      Eterm *ret,
2034                                      enum DbIterSafety* safety_p)
2035 {
2036     int result;
2037     CATreeRootIterator iter;
2038 
2039     init_root_iterator(&tbl->catree, &iter, 1);
2040     result = db_select_continue_tree_common(p, &tbl->common,
2041                                             continuation, ret, NULL, &iter);
2042     destroy_root_iterator(&iter);
2043     return result;
2044 }
2045 
db_select_catree(Process * p,DbTable * tbl,Eterm tid,Eterm pattern,int reverse,Eterm * ret,enum DbIterSafety safety)2046 static int db_select_catree(Process *p, DbTable *tbl, Eterm tid,
2047                             Eterm pattern, int reverse, Eterm *ret,
2048                             enum DbIterSafety safety)
2049 {
2050     int result;
2051     CATreeRootIterator iter;
2052 
2053     init_root_iterator(&tbl->catree, &iter, 1);
2054     result = db_select_tree_common(p, tbl, tid, pattern, reverse, ret,
2055                                    NULL, &iter);
2056     destroy_root_iterator(&iter);
2057     return result;
2058 }
2059 
db_select_count_continue_catree(Process * p,DbTable * tbl,Eterm continuation,Eterm * ret,enum DbIterSafety * safety_p)2060 static int db_select_count_continue_catree(Process *p,
2061                                            DbTable *tbl,
2062                                            Eterm continuation,
2063                                            Eterm *ret,
2064                                            enum DbIterSafety* safety_p)
2065 {
2066     int result;
2067     CATreeRootIterator iter;
2068 
2069     init_root_iterator(&tbl->catree, &iter, 1);
2070     result = db_select_count_continue_tree_common(p, tbl,
2071                                                   continuation, ret, NULL,
2072                                                   &iter);
2073     destroy_root_iterator(&iter);
2074     return result;
2075 }
2076 
db_select_count_catree(Process * p,DbTable * tbl,Eterm tid,Eterm pattern,Eterm * ret,enum DbIterSafety safety)2077 static int db_select_count_catree(Process *p, DbTable *tbl, Eterm tid,
2078                                   Eterm pattern, Eterm *ret,
2079                                   enum DbIterSafety safety)
2080 {
2081     int result;
2082     CATreeRootIterator iter;
2083 
2084     init_root_iterator(&tbl->catree, &iter, 1);
2085     result = db_select_count_tree_common(p, tbl,
2086                                          tid, pattern, ret, NULL, &iter);
2087     destroy_root_iterator(&iter);
2088     return result;
2089 }
2090 
db_select_chunk_catree(Process * p,DbTable * tbl,Eterm tid,Eterm pattern,Sint chunk_size,int reversed,Eterm * ret,enum DbIterSafety safety)2091 static int db_select_chunk_catree(Process *p, DbTable *tbl, Eterm tid,
2092                                   Eterm pattern, Sint chunk_size,
2093                                   int reversed, Eterm *ret,
2094                                   enum DbIterSafety safety)
2095 {
2096     int result;
2097     CATreeRootIterator iter;
2098 
2099     init_root_iterator(&tbl->catree, &iter, 1);
2100     result = db_select_chunk_tree_common(p, tbl,
2101                                          tid, pattern, chunk_size, reversed, ret,
2102                                          NULL, &iter);
2103     destroy_root_iterator(&iter);
2104     return result;
2105 }
2106 
db_select_delete_continue_catree(Process * p,DbTable * tbl,Eterm continuation,Eterm * ret,enum DbIterSafety * safety_p)2107 static int db_select_delete_continue_catree(Process *p,
2108                                             DbTable *tbl,
2109                                             Eterm continuation,
2110                                             Eterm *ret,
2111                                             enum DbIterSafety* safety_p)
2112 {
2113     DbTreeStack stack;
2114     TreeDbTerm * stack_array[STACK_NEED];
2115     int result;
2116     CATreeRootIterator iter;
2117 
2118     init_root_iterator(&tbl->catree, &iter, 0);
2119     init_tree_stack(&stack, stack_array, 0);
2120     result = db_select_delete_continue_tree_common(p, tbl, continuation, ret,
2121                                                    &stack, &iter);
2122     destroy_root_iterator(&iter);
2123     return result;
2124 }
2125 
db_select_delete_catree(Process * p,DbTable * tbl,Eterm tid,Eterm pattern,Eterm * ret,enum DbIterSafety safety)2126 static int db_select_delete_catree(Process *p, DbTable *tbl, Eterm tid,
2127                                    Eterm pattern, Eterm *ret,
2128                                    enum DbIterSafety safety)
2129 {
2130     DbTreeStack stack;
2131     TreeDbTerm * stack_array[STACK_NEED];
2132     int result;
2133     CATreeRootIterator iter;
2134 
2135     init_root_iterator(&tbl->catree, &iter, 0);
2136     init_tree_stack(&stack, stack_array, 0);
2137     result = db_select_delete_tree_common(p, tbl,
2138                                           tid, pattern, ret, &stack,
2139                                           &iter);
2140     destroy_root_iterator(&iter);
2141     return result;
2142 }
2143 
db_select_replace_catree(Process * p,DbTable * tbl,Eterm tid,Eterm pattern,Eterm * ret,enum DbIterSafety safety_p)2144 static int db_select_replace_catree(Process *p, DbTable *tbl, Eterm tid,
2145                                     Eterm pattern, Eterm *ret,
2146                                     enum DbIterSafety safety_p)
2147 {
2148     int result;
2149     CATreeRootIterator iter;
2150 
2151     init_root_iterator(&tbl->catree, &iter, 0);
2152     result = db_select_replace_tree_common(p, tbl,
2153                                            tid, pattern, ret, NULL, &iter);
2154     destroy_root_iterator(&iter);
2155     return result;
2156 }
2157 
db_select_replace_continue_catree(Process * p,DbTable * tbl,Eterm continuation,Eterm * ret,enum DbIterSafety * safety_p)2158 static int db_select_replace_continue_catree(Process *p, DbTable *tbl,
2159                                              Eterm continuation, Eterm *ret,
2160                                              enum DbIterSafety* safety_p)
2161 {
2162     int result;
2163     CATreeRootIterator iter;
2164 
2165     init_root_iterator(&tbl->catree, &iter, 0);
2166     result = db_select_replace_continue_tree_common(p, tbl, continuation, ret,
2167                                                     NULL, &iter);
2168     destroy_root_iterator(&iter);
2169     return result;
2170 }
2171 
db_take_catree(Process * p,DbTable * tbl,Eterm key,Eterm * ret)2172 static int db_take_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
2173 {
2174     DbTableCATree *tb = &tbl->catree;
2175     FindBaseNode fbn;
2176     DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn);
2177     int result = db_take_tree_common(p, tbl, &node->u.base.root, key,
2178                                      ret, NULL);
2179     wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level);
2180     return result;
2181 }
2182 
2183 /*
2184 ** Other interface routines (not directly coupled to one bif)
2185 */
2186 
2187 
2188 /* Display tree contents (for dump) */
db_print_catree(fmtfn_t to,void * to_arg,int show,DbTable * tbl)2189 static void db_print_catree(fmtfn_t to, void *to_arg,
2190                             int show, DbTable *tbl)
2191 {
2192     CATreeRootIterator iter;
2193     TreeDbTerm** root;
2194 
2195     init_root_iterator(&tbl->catree, &iter, 1);
2196     root = catree_find_first_root(&iter);
2197     do {
2198         db_print_tree_common(to, to_arg, show, *root, tbl);
2199         root = catree_find_next_root(&iter, NULL);
2200     } while (root);
2201     destroy_root_iterator(&iter);
2202 }
2203 
2204 /* Release all memory occupied by a single table */
db_free_table_catree(DbTable * tbl)2205 static int db_free_table_catree(DbTable *tbl)
2206 {
2207     while (db_free_table_continue_catree(tbl, ERTS_SWORD_MAX) < 0)
2208 	;
2209     return 1;
2210 }
2211 
2212 static
db_catree_nr_of_items_deleted_wb_dtor(Binary * context_bin)2213 int db_catree_nr_of_items_deleted_wb_dtor(Binary *context_bin) {
2214     (void)context_bin;
2215     return 1;
2216 }
2217 
2218 typedef struct {
2219     Uint nr_of_deleted_items;
2220 } DbCATreeNrOfItemsDeletedWb;
2221 
2222 static Eterm
create_and_install_num_of_deleted_items_wb_bin(Process * p,DbTableCATree * tb)2223 create_and_install_num_of_deleted_items_wb_bin(Process *p, DbTableCATree *tb)
2224 {
2225     Binary* bin =
2226         erts_create_magic_binary(sizeof(DbCATreeNrOfItemsDeletedWb),
2227                                  db_catree_nr_of_items_deleted_wb_dtor);
2228     DbCATreeNrOfItemsDeletedWb* data = ERTS_MAGIC_BIN_DATA(bin);
2229     Eterm* hp = HAlloc(p, ERTS_MAGIC_REF_THING_SIZE);
2230     Eterm mref = erts_mk_magic_ref(&hp, &MSO(p), bin);
2231     data->nr_of_deleted_items = 0;
2232     tb->nr_of_deleted_items_wb = bin;
2233     erts_refc_inctest(&bin->intern.refc, 2);
2234     return mref;
2235 }
2236 
db_delete_all_objects_get_nitems_from_holder_catree(Process * p,Eterm mref)2237 static Eterm db_delete_all_objects_get_nitems_from_holder_catree(Process* p,
2238                                                                  Eterm mref)
2239 {
2240     Binary* bin = erts_magic_ref2bin(mref);
2241     DbCATreeNrOfItemsDeletedWb* data = ERTS_MAGIC_BIN_DATA(bin);
2242     return erts_make_integer(data->nr_of_deleted_items, p);
2243 }
2244 
db_delete_all_objects_catree(Process * p,DbTable * tbl,SWord reds,Eterm * nitems_holder_wb)2245 static SWord db_delete_all_objects_catree(Process* p,
2246                                           DbTable* tbl,
2247                                           SWord reds,
2248                                           Eterm* nitems_holder_wb)
2249 {
2250     DbTableCATree *tb = &tbl->catree;
2251     DbCATreeNrOfItemsDeletedWb* data;
2252     if (!tb->deletion) {
2253         *nitems_holder_wb =
2254             create_and_install_num_of_deleted_items_wb_bin(p, tb);
2255     }
2256     reds = db_free_table_continue_catree(tbl, reds);
2257     if (reds < 0)
2258         return reds;
2259     data = ERTS_MAGIC_BIN_DATA(tb->nr_of_deleted_items_wb);
2260     data->nr_of_deleted_items = tb->nr_of_deleted_items;
2261     erts_bin_release(tb->nr_of_deleted_items_wb);
2262     db_create_catree(p, tbl);
2263     return reds;
2264 }
2265 
2266 
do_for_route_nodes(DbTableCATreeNode * node,void (* func)(ErlOffHeap *,void *),void * arg)2267 static void do_for_route_nodes(DbTableCATreeNode* node,
2268                                void (*func)(ErlOffHeap *, void *),
2269                                void *arg)
2270 {
2271     ErlOffHeap tmp_offheap;
2272 
2273     if (!GET_LEFT(node)->is_base_node)
2274         do_for_route_nodes(GET_LEFT(node), func, arg);
2275 
2276     tmp_offheap.first = node->u.route.key.oh;
2277     tmp_offheap.overhead = 0;
2278     (*func)(&tmp_offheap, arg);
2279 
2280     if (!GET_RIGHT(node)->is_base_node)
2281         do_for_route_nodes(GET_RIGHT(node), func, arg);
2282 }
2283 
db_foreach_offheap_catree(DbTable * tbl,void (* func)(ErlOffHeap *,void *),void * arg)2284 static void db_foreach_offheap_catree(DbTable *tbl,
2285                                       void (*func)(ErlOffHeap *, void *),
2286                                       void *arg)
2287 {
2288     DbTableCATree* tb = &tbl->catree;
2289     CATreeRootIterator iter;
2290     TreeDbTerm** root;
2291 
2292     if (!GET_ROOT(tb)) {
2293         ASSERT(tb->common.status & DB_DELETE);
2294         return;
2295     }
2296     init_root_iterator(tb, &iter, 1);
2297     root = catree_find_first_root(&iter);
2298     do {
2299         db_foreach_offheap_tree_common(*root, func, arg);
2300         root = catree_find_next_root(&iter, NULL);
2301     } while (root);
2302     destroy_root_iterator(&iter);
2303 
2304     do_for_route_nodes(GET_ROOT(tb), func, arg);
2305 }
2306 
db_lookup_dbterm_catree(Process * p,DbTable * tbl,Eterm key,Eterm obj,DbUpdateHandle * handle)2307 static int db_lookup_dbterm_catree(Process *p, DbTable *tbl, Eterm key, Eterm obj,
2308                                    DbUpdateHandle *handle)
2309 {
2310     DbTableCATree *tb = &tbl->catree;
2311     FindBaseNode fbn;
2312     DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn);
2313     int res = db_lookup_dbterm_tree_common(p, tbl, &node->u.base.root, key,
2314                                            obj, handle, NULL);
2315     if (res == 0) {
2316         wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level);
2317     } else {
2318         /* db_finalize_dbterm_catree will unlock */
2319         handle->u.catree.base_node = node;
2320         handle->u.catree.parent = fbn.parent;
2321         handle->u.catree.current_level = fbn.current_level;
2322     }
2323     return res;
2324 }
2325 
db_finalize_dbterm_catree(int cret,DbUpdateHandle * handle)2326 static void db_finalize_dbterm_catree(int cret, DbUpdateHandle *handle)
2327 {
2328     DbTableCATree *tb = &(handle->tb->catree);
2329     db_finalize_dbterm_tree_common(cret,
2330                                    handle,
2331                                    &handle->u.catree.base_node->u.base.root,
2332                                    NULL);
2333     wunlock_adapt_base_node(tb, handle->u.catree.base_node,
2334                             handle->u.catree.parent,
2335                             handle->u.catree.current_level);
2336     return;
2337 }
2338 
db_get_binary_info_catree(Process * p,DbTable * tbl,Eterm key,Eterm * ret)2339 static int db_get_binary_info_catree(Process *p, DbTable *tbl, Eterm key,
2340                                      Eterm *ret)
2341 {
2342     DbTableCATree *tb = &tbl->catree;
2343     FindBaseNode fbn;
2344     DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn);
2345     TreeDbTerm* this = db_find_tree_node_common(&tbl->common, node->u.base.root,
2346                                                 key);
2347     *ret = db_binary_info_tree_common(p, this);
2348     wunlock_base_node(node);
2349     return DB_ERROR_NONE;
2350 }
2351 
2352 #ifdef ERTS_ENABLE_LOCK_COUNT
erts_lcnt_enable_db_catree_lock_count_helper(DbTableCATree * tb,DbTableCATreeNode * node,int enable)2353 static void erts_lcnt_enable_db_catree_lock_count_helper(DbTableCATree *tb,
2354                                                          DbTableCATreeNode *node,
2355                                                          int enable)
2356 {
2357     erts_lcnt_ref_t *lcnt_ref;
2358     erts_lock_flags_t lock_type;
2359     if (node->is_base_node) {
2360         lcnt_ref = &GET_BASE_NODE_LOCK(node)->lcnt;
2361         lock_type = ERTS_LOCK_TYPE_RWMUTEX;
2362     } else {
2363         erts_lcnt_enable_db_catree_lock_count_helper(tb, GET_LEFT(node), enable);
2364         erts_lcnt_enable_db_catree_lock_count_helper(tb, GET_RIGHT(node), enable);
2365         lcnt_ref = &GET_ROUTE_NODE_LOCK(node)->lcnt;
2366         lock_type = ERTS_LOCK_TYPE_MUTEX;
2367     }
2368     if (enable) {
2369         erts_lcnt_install_new_lock_info(lcnt_ref, "db_hash_slot", tb->common.the_name,
2370                                         lock_type | ERTS_LOCK_FLAGS_CATEGORY_DB);
2371     } else {
2372         erts_lcnt_uninstall(lcnt_ref);
2373     }
2374 }
2375 
erts_lcnt_enable_db_catree_lock_count(DbTableCATree * tb,int enable)2376 void erts_lcnt_enable_db_catree_lock_count(DbTableCATree *tb, int enable)
2377 {
2378     erts_lcnt_enable_db_catree_lock_count_helper(tb, GET_ROOT(tb), enable);
2379 }
2380 #endif /* ERTS_ENABLE_LOCK_COUNT */
2381 
db_catree_force_split(DbTableCATree * tb,int on)2382 void db_catree_force_split(DbTableCATree* tb, int on)
2383 {
2384     CATreeRootIterator iter;
2385     TreeDbTerm** root;
2386 
2387     init_root_iterator(tb, &iter, 1);
2388     root = catree_find_first_root(&iter);
2389     do {
2390         BASE_NODE_STAT_SET(iter.locked_bnode, (on ? INT_MAX : 0));
2391         root = catree_find_next_root(&iter, NULL);
2392     } while (root);
2393     destroy_root_iterator(&iter);
2394 
2395     if (on)
2396         tb->common.status |= DB_CATREE_FORCE_SPLIT;
2397     else
2398         tb->common.status &= ~DB_CATREE_FORCE_SPLIT;
2399 }
2400 
db_catree_debug_random_split_join(DbTableCATree * tb,int on)2401 void db_catree_debug_random_split_join(DbTableCATree* tb, int on)
2402 {
2403     if (on)
2404         tb->common.status |= DB_CATREE_DEBUG_RANDOM_SPLIT_JOIN;
2405     else
2406         tb->common.status &= ~DB_CATREE_DEBUG_RANDOM_SPLIT_JOIN;
2407 }
2408 
db_calc_stats_catree(DbTableCATree * tb,DbCATreeStats * stats)2409 void db_calc_stats_catree(DbTableCATree* tb, DbCATreeStats* stats)
2410 {
2411     DbTableCATreeNode* stack[ERL_DB_CATREE_MAX_ROUTE_NODE_LAYER_HEIGHT];
2412     DbTableCATreeNode* node;
2413     Uint depth = 0;
2414 
2415     stats->route_nodes = 0;
2416     stats->base_nodes = 0;
2417     stats->max_depth = 0;
2418 
2419     node = GET_ROOT(tb);
2420     do {
2421         while (!node->is_base_node) {
2422             stats->route_nodes++;
2423             ASSERT(depth < sizeof(stack)/sizeof(*stack));
2424             stack[depth++] = node;  /* PUSH parent */
2425             if (stats->max_depth < depth)
2426                 stats->max_depth = depth;
2427             node = GET_LEFT(node);
2428         }
2429         stats->base_nodes++;
2430 
2431         while (depth > 0) {
2432             DbTableCATreeNode* parent = stack[depth-1];
2433             if (node == GET_LEFT(parent)) {
2434                 node = GET_RIGHT(parent);
2435                 break;
2436             }
2437             else {
2438                 ASSERT(node == GET_RIGHT(parent));
2439                 node = parent;
2440                 depth--; /* POP parent */
2441             }
2442         }
2443     } while (depth > 0);
2444 }
2445 
2446 struct debug_catree_fa {
2447     void (*func)(ErlOffHeap *, void *);
2448     void *arg;
2449 };
2450 
debug_free_route_node(void * vfap,ErtsThrPrgrVal val,void * vnp)2451 static void debug_free_route_node(void *vfap, ErtsThrPrgrVal val, void *vnp)
2452 {
2453     DbTableCATreeNode *np = vnp;
2454     if (np->u.route.key.oh) {
2455         struct debug_catree_fa *fap = vfap;
2456         ErlOffHeap oh;
2457         ERTS_INIT_OFF_HEAP(&oh);
2458         oh.first = np->u.route.key.oh;
2459         (*fap->func)(&oh, fap->arg);
2460     }
2461 }
2462 
2463 void
erts_db_foreach_thr_prgr_offheap_catree(void (* func)(ErlOffHeap *,void *),void * arg)2464 erts_db_foreach_thr_prgr_offheap_catree(void (*func)(ErlOffHeap *, void *),
2465                                         void *arg)
2466 {
2467     struct debug_catree_fa fa;
2468     fa.func = func;
2469     fa.arg = arg;
2470     erts_debug_later_op_foreach(do_free_route_node, debug_free_route_node, &fa);
2471 }
2472 
2473 
2474 #ifdef HARDDEBUG
2475 
2476 /*
2477  * Not called, but kept as it might come to use
2478  */
my_check_table_tree(TreeDbTerm * t)2479 static inline int my_check_table_tree(TreeDbTerm *t)
2480 {
2481     int lh, rh;
2482     if (t == NULL)
2483 	return 0;
2484     lh = my_check_table_tree(t->left);
2485     rh = my_check_table_tree(t->right);
2486     if ((rh - lh) != t->balance) {
2487 	erts_fprintf(stderr, "Invalid tree balance for this node:\n");
2488 	erts_fprintf(stderr,"balance = %d, left = 0x%08X, right = 0x%08X\n",
2489 		     t->balance, t->left, t->right);
2490 	erts_fprintf(stderr,"\nDump:\n---------------------------------\n");
2491 	erts_fprintf(stderr,"\n---------------------------------\n");
2492         abort();
2493     }
2494     return ((rh > lh) ? rh : lh) + 1;
2495 }
2496 
2497 #endif
2498