1 /*
2 * %CopyrightBegin%
3 *
4 * Copyright Ericsson AB and Kjell Winblad 1998-2020. All Rights Reserved.
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * %CopyrightEnd%
19 */
20
21 /*
22 * Description: Implementation of ETS ordered_set table type with
23 * fine-grained synchronization.
24 *
25 * Author: Kjell Winblad
26 *
27 * This implementation is based on the contention adapting search tree
28 * (CA tree). The CA tree is a concurrent data structure that
29 * dynamically adapts its synchronization granularity based on how
30 * much contention is detected in locks. The following publication
31 * contains a detailed description of CA trees:
32 *
33 * A Contention Adapting Approach to Concurrent Ordered Sets
34 * Journal of Parallel and Distributed Computing, 2018
35 * Kjell Winblad and Konstantinos Sagonas
36 * https://doi.org/10.1016/j.jpdc.2017.11.007
37 *
38 * The following publication may also be interesting as it discusses
39 * how the CA tree can be used as an ETS ordered_set table type
40 * backend:
41 *
42 * More Scalable Ordered Set for ETS Using Adaptation
43 * In Thirteenth ACM SIGPLAN workshop on Erlang (2014)
44 * Kjell Winblad and Konstantinos Sagonas
45 * https://doi.org/10.1145/2633448.2633455
46 *
47 * This implementation of the ordered_set ETS table type is only
48 * activated when the options {write_concurrency, true}, public and
49 * ordered_set are passed to the ets:new/2 function. This
50 * implementation is expected to scale better than the default
51 * implementation located in "erl_db_tree.c".
52 *
53 * The default implementation has a static stack optimization (see
54 * get_static_stack in erl_db_tree.c). This implementation does not
55 * have such an optimization as it induces bad scalability when
56 * concurrent read operations are frequent (they all try to get hold
57 * of the same stack). The default implementation may thus perform
58 * better compared to this implementation in scenarios where the
59 * static stack optimization is useful. One such scenario is when only
60 * one process is accessing the table and this process is traversing
61 * the table with a sequence of next/2 calls.
62 */
63
64 #ifdef HAVE_CONFIG_H
65 # include "config.h"
66 #endif
67
68 #include "sys.h"
69 #include "erl_vm.h"
70 #include "global.h"
71 #include "erl_process.h"
72 #include "error.h"
73 #define ERTS_WANT_DB_INTERNAL__
74 #include "erl_db.h"
75 #include "bif.h"
76 #include "big.h"
77 #include "erl_binary.h"
78
79 #include "erl_db_catree.h"
80 #include "erl_db_tree.h"
81 #include "erl_db_tree_util.h"
82
83 #ifdef DEBUG
84 # define IF_DEBUG(X) X
85 #else
86 # define IF_DEBUG(X)
87 #endif
88
89 /*
90 ** Forward declarations
91 */
92
93 static SWord do_delete_base_node_cont(DbTableCATree *tb,
94 DbTableCATreeNode *base_node,
95 SWord num_left);
96
97 /* Method interface functions */
98 static int db_first_catree(Process *p, DbTable *tbl,
99 Eterm *ret);
100 static int db_next_catree(Process *p, DbTable *tbl,
101 Eterm key, Eterm *ret);
102 static int db_last_catree(Process *p, DbTable *tbl,
103 Eterm *ret);
104 static int db_prev_catree(Process *p, DbTable *tbl,
105 Eterm key,
106 Eterm *ret);
107 static int db_put_catree(DbTable *tbl, Eterm obj, int key_clash_fail,
108 SWord *consumed_reds_p);
109 static int db_get_catree(Process *p, DbTable *tbl,
110 Eterm key, Eterm *ret);
111 static int db_member_catree(DbTable *tbl, Eterm key, Eterm *ret);
112 static int db_get_element_catree(Process *p, DbTable *tbl,
113 Eterm key,int ndex,
114 Eterm *ret);
115 static int db_erase_catree(DbTable *tbl, Eterm key, Eterm *ret);
116 static int db_erase_object_catree(DbTable *tbl, Eterm object,Eterm *ret);
117 static int db_slot_catree(Process *p, DbTable *tbl,
118 Eterm slot_term, Eterm *ret);
119 static int db_select_catree(Process *p, DbTable *tbl, Eterm tid,
120 Eterm pattern, int reversed, Eterm *ret,
121 enum DbIterSafety);
122 static int db_select_count_catree(Process *p, DbTable *tbl, Eterm tid,
123 Eterm pattern, Eterm *ret, enum DbIterSafety);
124 static int db_select_chunk_catree(Process *p, DbTable *tbl, Eterm tid,
125 Eterm pattern, Sint chunk_size,
126 int reversed, Eterm *ret, enum DbIterSafety);
127 static int db_select_continue_catree(Process *p, DbTable *tbl,
128 Eterm continuation, Eterm *ret,
129 enum DbIterSafety*);
130 static int db_select_count_continue_catree(Process *p, DbTable *tbl,
131 Eterm continuation, Eterm *ret,
132 enum DbIterSafety*);
133 static int db_select_delete_catree(Process *p, DbTable *tbl, Eterm tid,
134 Eterm pattern, Eterm *ret,
135 enum DbIterSafety);
136 static int db_select_delete_continue_catree(Process *p, DbTable *tbl,
137 Eterm continuation, Eterm *ret,
138 enum DbIterSafety*);
139 static int db_select_replace_catree(Process *p, DbTable *tbl, Eterm tid,
140 Eterm pattern, Eterm *ret,
141 enum DbIterSafety);
142 static int db_select_replace_continue_catree(Process *p, DbTable *tbl,
143 Eterm continuation, Eterm *ret,
144 enum DbIterSafety*);
145 static int db_take_catree(Process *, DbTable *, Eterm, Eterm *);
146 static void db_print_catree(fmtfn_t to, void *to_arg,
147 int show, DbTable *tbl);
148 static int db_free_table_catree(DbTable *tbl);
149 static SWord db_free_table_continue_catree(DbTable *tbl, SWord);
150 static void db_foreach_offheap_catree(DbTable *,
151 void (*)(ErlOffHeap *, void *),
152 void *);
153 static SWord db_delete_all_objects_catree(Process* p,
154 DbTable* tbl,
155 SWord reds,
156 Eterm* nitems_holder_wb);
157 static Eterm db_delete_all_objects_get_nitems_from_holder_catree(Process* p,
158 Eterm nitems_holder);
159 static int
160 db_lookup_dbterm_catree(Process *, DbTable *, Eterm key, Eterm obj,
161 DbUpdateHandle*);
162 static void db_finalize_dbterm_catree(int cret, DbUpdateHandle *);
163 static int db_get_binary_info_catree(Process*, DbTable*, Eterm key, Eterm *ret);
164 static int db_put_dbterm_catree(DbTable* tbl,
165 void* obj,
166 int key_clash_fail,
167 SWord *consumed_reds_p);
168
169 static void split_catree(DbTableCATree *tb,
170 DbTableCATreeNode* ERTS_RESTRICT base,
171 DbTableCATreeNode* ERTS_RESTRICT parent);
172 static void join_catree(DbTableCATree *tb,
173 DbTableCATreeNode *thiz,
174 DbTableCATreeNode *parent);
175 static ERTS_INLINE
176 int try_wlock_base_node(DbTableCATreeBaseNode *base_node);
177 static ERTS_INLINE
178 void wunlock_base_node(DbTableCATreeNode *base_node);
179 static ERTS_INLINE
180 void wlock_base_node_no_stats(DbTableCATreeNode *base_node);
181 static ERTS_INLINE
182 void wunlock_adapt_base_node(DbTableCATree* tb,
183 DbTableCATreeNode* node,
184 DbTableCATreeNode* parent,
185 int current_level);
186 /*
187 ** External interface
188 */
189 DbTableMethod db_catree =
190 {
191 db_create_catree,
192 db_first_catree,
193 db_next_catree,
194 db_last_catree,
195 db_prev_catree,
196 db_put_catree,
197 db_get_catree,
198 db_get_element_catree,
199 db_member_catree,
200 db_erase_catree,
201 db_erase_object_catree,
202 db_slot_catree,
203 db_select_chunk_catree,
204 db_select_catree,
205 db_select_delete_catree,
206 db_select_continue_catree,
207 db_select_delete_continue_catree,
208 db_select_count_catree,
209 db_select_count_continue_catree,
210 db_select_replace_catree,
211 db_select_replace_continue_catree,
212 db_take_catree,
213 db_delete_all_objects_catree,
214 db_delete_all_objects_get_nitems_from_holder_catree,
215 db_free_table_catree,
216 db_free_table_continue_catree,
217 db_print_catree,
218 db_foreach_offheap_catree,
219 db_lookup_dbterm_catree,
220 db_finalize_dbterm_catree,
221 db_eterm_to_dbterm_tree_common,
222 db_dbterm_list_prepend_tree_common,
223 db_dbterm_list_remove_first_tree_common,
224 db_put_dbterm_catree,
225 db_free_dbterm_tree_common,
226 db_get_dbterm_key_tree_common,
227 db_get_binary_info_catree,
228 db_first_catree, /* raw_first same as first */
229 db_next_catree /* raw_next same as next */
230 };
231
232 /*
233 * Constants
234 */
235
236 #define ERL_DB_CATREE_LOCK_FAILURE_CONTRIBUTION 250
237 #define ERL_DB_CATREE_LOCK_SUCCESS_CONTRIBUTION (-1)
238 #define ERL_DB_CATREE_LOCK_GRAVITY_CONTRIBUTION (-500)
239 #define ERL_DB_CATREE_LOCK_GRAVITY_PATTERN (0xFF800000)
240 #define ERL_DB_CATREE_LOCK_MORE_THAN_ONE_CONTRIBUTION (-10)
241 #define ERL_DB_CATREE_HIGH_CONTENTION_LIMIT 1000
242 #define ERL_DB_CATREE_LOW_CONTENTION_LIMIT (-1000)
243 #define ERL_DB_CATREE_MAX_ROUTE_NODE_LAYER_HEIGHT 16
244 #define ERL_DB_CATREE_LOCK_LOW_NO_CONTRIBUTION_LIMIT (-20000)
245 #define ERL_DB_CATREE_LOCK_HIGH_NO_CONTRIBUTION_LIMIT (20000)
246
247 /*
248 * Internal CA tree related helper functions and macros
249 */
250
251 #define GET_ROUTE_NODE_KEY(node) (node->u.route.key.term)
252 #define GET_BASE_NODE_LOCK(node) (&(node->u.base.lock))
253 #define GET_ROUTE_NODE_LOCK(node) (&(node->u.route.lock))
254
255
256 /* Helpers for reading and writing shared atomic variables */
257
258 /* No memory barrier */
259 #define GET_ROOT(tb) ((DbTableCATreeNode*)erts_atomic_read_nob(&((tb)->root)))
260 #define GET_LEFT(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_nob(&(ca_tree_route_node->u.route.left)))
261 #define GET_RIGHT(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_nob(&(ca_tree_route_node->u.route.right)))
262 #define SET_ROOT(tb, v) erts_atomic_set_nob(&((tb)->root), (erts_aint_t)(v))
263 #define SET_LEFT(ca_tree_route_node, v) erts_atomic_set_nob(&(ca_tree_route_node->u.route.left), (erts_aint_t)(v));
264 #define SET_RIGHT(ca_tree_route_node, v) erts_atomic_set_nob(&(ca_tree_route_node->u.route.right), (erts_aint_t)(v));
265
266
267 /* Release or acquire barriers */
268 #define GET_ROOT_ACQB(tb) ((DbTableCATreeNode*)erts_atomic_read_acqb(&((tb)->root)))
269 #define GET_LEFT_ACQB(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_acqb(&(ca_tree_route_node->u.route.left)))
270 #define GET_RIGHT_ACQB(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_acqb(&(ca_tree_route_node->u.route.right)))
271 #define SET_ROOT_RELB(tb, v) erts_atomic_set_relb(&((tb)->root), (erts_aint_t)(v))
272 #define SET_LEFT_RELB(ca_tree_route_node, v) erts_atomic_set_relb(&(ca_tree_route_node->u.route.left), (erts_aint_t)(v));
273 #define SET_RIGHT_RELB(ca_tree_route_node, v) erts_atomic_set_relb(&(ca_tree_route_node->u.route.right), (erts_aint_t)(v));
274
275 /* Change base node lock statistics */
276 #define BASE_NODE_STAT_SET(NODE, VALUE) erts_atomic_set_nob(&(NODE)->u.base.lock_statistics, VALUE)
277 #define BASE_NODE_STAT_READ(NODE) erts_atomic_read_nob(&(NODE)->u.base.lock_statistics)
278 #define BASE_NODE_STAT_ADD(NODE, VALUE) \
279 do { \
280 Sint v = erts_atomic_read_nob(&((NODE)->u.base.lock_statistics)); \
281 ASSERT(VALUE > 0); \
282 if(v < ERL_DB_CATREE_LOCK_HIGH_NO_CONTRIBUTION_LIMIT) { \
283 erts_atomic_set_nob(&(NODE->u.base.lock_statistics), v + VALUE); \
284 } \
285 }while(0);
286 #define BASE_NODE_STAT_SUB(NODE, VALUE) \
287 do { \
288 Sint v = erts_atomic_read_nob(&((NODE)->u.base.lock_statistics)); \
289 ASSERT(VALUE < 0); \
290 if(v > ERL_DB_CATREE_LOCK_LOW_NO_CONTRIBUTION_LIMIT) { \
291 erts_atomic_set_nob(&(NODE->u.base.lock_statistics), v + VALUE); \
292 } \
293 }while(0);
294
295
296 /* Compares a key to the key in a route node */
cmp_key_route(Eterm key,DbTableCATreeNode * obj)297 static ERTS_INLINE Sint cmp_key_route(Eterm key,
298 DbTableCATreeNode *obj)
299 {
300 return CMP(key, GET_ROUTE_NODE_KEY(obj));
301 }
302
303 /*
304 * Used by the split_tree function
305 */
306 static ERTS_INLINE
less_than_two_elements(TreeDbTerm * root)307 int less_than_two_elements(TreeDbTerm *root)
308 {
309 return root == NULL || (root->left == NULL && root->right == NULL);
310 }
311
312 /*
313 * Inserts a TreeDbTerm into a tree. Returns the new root.
314 */
315 static ERTS_INLINE
insert_TreeDbTerm(DbTableCATree * tb,TreeDbTerm * insert_to_root,TreeDbTerm * value_to_insert)316 TreeDbTerm* insert_TreeDbTerm(DbTableCATree *tb,
317 TreeDbTerm *insert_to_root,
318 TreeDbTerm *value_to_insert) {
319 /* Non recursive insertion in AVL tree, building our own stack */
320 TreeDbTerm **tstack[STACK_NEED];
321 int tpos = 0;
322 int dstack[STACK_NEED+1];
323 int dpos = 0;
324 int state = 0;
325 TreeDbTerm * base = insert_to_root;
326 TreeDbTerm **this = &base;
327 Sint c;
328 Eterm key;
329 int dir;
330 TreeDbTerm *p1, *p2, *p;
331
332 key = GETKEY(tb, value_to_insert->dbterm.tpl);
333
334 dstack[dpos++] = DIR_END;
335 for (;;)
336 if (!*this) { /* Found our place */
337 state = 1;
338 *this = value_to_insert;
339 (*this)->balance = 0;
340 (*this)->left = (*this)->right = NULL;
341 break;
342 } else if ((c = cmp_key(&tb->common, key, *this)) < 0) {
343 /* go lefts */
344 dstack[dpos++] = DIR_LEFT;
345 tstack[tpos++] = this;
346 this = &((*this)->left);
347 } else { /* go right */
348 dstack[dpos++] = DIR_RIGHT;
349 tstack[tpos++] = this;
350 this = &((*this)->right);
351 }
352
353 while (state && ( dir = dstack[--dpos] ) != DIR_END) {
354 this = tstack[--tpos];
355 p = *this;
356 if (dir == DIR_LEFT) {
357 switch (p->balance) {
358 case 1:
359 p->balance = 0;
360 state = 0;
361 break;
362 case 0:
363 p->balance = -1;
364 break;
365 case -1: /* The icky case */
366 p1 = p->left;
367 if (p1->balance == -1) { /* Single LL rotation */
368 p->left = p1->right;
369 p1->right = p;
370 p->balance = 0;
371 (*this) = p1;
372 } else { /* Double RR rotation */
373 ASSERT(p1->right);
374 p2 = p1->right;
375 p1->right = p2->left;
376 p2->left = p1;
377 p->left = p2->right;
378 p2->right = p;
379 p->balance = (p2->balance == -1) ? +1 : 0;
380 p1->balance = (p2->balance == 1) ? -1 : 0;
381 (*this) = p2;
382 }
383 (*this)->balance = 0;
384 state = 0;
385 break;
386 }
387 } else { /* dir == DIR_RIGHT */
388 switch (p->balance) {
389 case -1:
390 p->balance = 0;
391 state = 0;
392 break;
393 case 0:
394 p->balance = 1;
395 break;
396 case 1:
397 p1 = p->right;
398 if (p1->balance == 1) { /* Single RR rotation */
399 p->right = p1->left;
400 p1->left = p;
401 p->balance = 0;
402 (*this) = p1;
403 } else { /* Double RL rotation */
404 ASSERT(p1->left);
405 p2 = p1->left;
406 p1->left = p2->right;
407 p2->right = p1;
408 p->right = p2->left;
409 p2->left = p;
410 p->balance = (p2->balance == 1) ? -1 : 0;
411 p1->balance = (p2->balance == -1) ? 1 : 0;
412 (*this) = p2;
413 }
414 (*this)->balance = 0;
415 state = 0;
416 break;
417 }
418 }
419 }
420 return base;
421 }
422
423 /*
424 * Split an AVL tree into two trees. The function stores the node
425 * containing the "split key" in the write back parameter
426 * split_key_wb. The function stores the left tree containing the keys
427 * that are smaller than the "split key" in the write back parameter
428 * left_wb and the tree containing the rest of the keys in the write
429 * back parameter right_wb.
430 */
split_tree(DbTableCATree * tb,TreeDbTerm * root,TreeDbTerm ** split_key_node_wb,TreeDbTerm ** left_wb,TreeDbTerm ** right_wb)431 static void split_tree(DbTableCATree *tb,
432 TreeDbTerm *root,
433 TreeDbTerm **split_key_node_wb,
434 TreeDbTerm **left_wb,
435 TreeDbTerm **right_wb) {
436 TreeDbTerm * split_node = NULL;
437 TreeDbTerm * left_root;
438 TreeDbTerm * right_root;
439 if (root->left == NULL) { /* To get non empty split */
440 *right_wb = root->right;
441 *split_key_node_wb = root->right;
442 root->right = NULL;
443 root->balance = 0;
444 *left_wb = root;
445 return;
446 }
447 split_node = root;
448 left_root = split_node->left;
449 split_node->left = NULL;
450 right_root = split_node->right;
451 split_node->right = NULL;
452 right_root = insert_TreeDbTerm(tb, right_root, split_node);
453 *split_key_node_wb = split_node;
454 *left_wb = left_root;
455 *right_wb = right_root;
456 }
457
458 /*
459 * Used by the join_trees function
460 */
compute_tree_hight(TreeDbTerm * root)461 static ERTS_INLINE int compute_tree_hight(TreeDbTerm * root)
462 {
463 if(root == NULL) {
464 return 0;
465 } else {
466 TreeDbTerm * current_node = root;
467 int hight_so_far = 1;
468 while (current_node->left != NULL || current_node->right != NULL) {
469 if (current_node->balance == -1) {
470 ASSERT(current_node->left != NULL);
471 current_node = current_node->left;
472 } else {
473 ASSERT(current_node->right != NULL);
474 current_node = current_node->right;
475 }
476 hight_so_far = hight_so_far + 1;
477 }
478 return hight_so_far;
479 }
480 }
481
482 /*
483 * Used by the join_trees function
484 */
485 static ERTS_INLINE
linkout_min_or_max_tree_node(TreeDbTerm ** root,int is_min)486 TreeDbTerm* linkout_min_or_max_tree_node(TreeDbTerm **root, int is_min)
487 {
488 TreeDbTerm **tstack[STACK_NEED];
489 int tpos = 0;
490 int dstack[STACK_NEED+1];
491 int dpos = 0;
492 int state = 0;
493 TreeDbTerm **this = root;
494 int dir;
495 TreeDbTerm *q = NULL;
496
497 dstack[dpos++] = DIR_END;
498 for (;;) {
499 if (!*this) { /* Failure */
500 return NULL;
501 } else if (is_min && (*this)->left != NULL) {
502 dstack[dpos++] = DIR_LEFT;
503 tstack[tpos++] = this;
504 this = &((*this)->left);
505 } else if (!is_min && (*this)->right != NULL) {
506 dstack[dpos++] = DIR_RIGHT;
507 tstack[tpos++] = this;
508 this = &((*this)->right);
509 } else { /* Min value, found the one to splice out */
510 q = (*this);
511 if (q->right == NULL) {
512 (*this) = q->left;
513 state = 1;
514 } else if (q->left == NULL) {
515 (*this) = q->right;
516 state = 1;
517 }
518 break;
519 }
520 }
521 while (state && ( dir = dstack[--dpos] ) != DIR_END) {
522 this = tstack[--tpos];
523 if (dir == DIR_LEFT) {
524 state = tree_balance_left(this);
525 } else {
526 state = tree_balance_right(this);
527 }
528 }
529 return q;
530 }
531
532 #define LINKOUT_MIN_TREE_NODE(root) linkout_min_or_max_tree_node(root, 1)
533 #define LINKOUT_MAX_TREE_NODE(root) linkout_min_or_max_tree_node(root, 0)
534
535 /*
536 * Joins two AVL trees where all the keys in the left one are smaller
537 * then the keys in the right one and returns the resulting tree.
538 *
539 * The algorithm is described on page 474 in D. E. Knuth. The Art of
540 * Computer Programming: Sorting and Searching,
541 * vol. 3. Addison-Wesley, 2nd edition, 1998.
542 */
join_trees(TreeDbTerm * left_root_param,TreeDbTerm * right_root_param)543 static TreeDbTerm* join_trees(TreeDbTerm *left_root_param,
544 TreeDbTerm *right_root_param)
545 {
546 TreeDbTerm **tstack[STACK_NEED];
547 int tpos = 0;
548 int dstack[STACK_NEED+1];
549 int dpos = 0;
550 int state = 1;
551 TreeDbTerm **this;
552 int dir;
553 TreeDbTerm *p1, *p2, *p;
554 TreeDbTerm *left_root = left_root_param;
555 TreeDbTerm *right_root = right_root_param;
556 int left_height;
557 int right_height;
558 int current_height;
559 dstack[dpos++] = DIR_END;
560 if (left_root == NULL) {
561 return right_root;
562 } else if (right_root == NULL) {
563 return left_root;
564 }
565
566 left_height = compute_tree_hight(left_root);
567 right_height = compute_tree_hight(right_root);
568 if (left_height >= right_height) {
569 TreeDbTerm * new_root =
570 LINKOUT_MIN_TREE_NODE(&right_root);
571 int new_right_height = compute_tree_hight(right_root);
572 TreeDbTerm * current_node = left_root;
573 this = &left_root;
574 current_height = left_height;
575 while(current_height > new_right_height + 1) {
576 if (current_node->balance == -1) {
577 current_height = current_height - 2;
578 } else {
579 current_height = current_height - 1;
580 }
581 dstack[dpos++] = DIR_RIGHT;
582 tstack[tpos++] = this;
583 this = &((*this)->right);
584 current_node = current_node->right;
585 }
586 new_root->left = current_node;
587 new_root->right = right_root;
588 new_root->balance = new_right_height - current_height;
589 *this = new_root;
590 } else {
591 /* This case is symmetric to the previous case */
592 TreeDbTerm * new_root =
593 LINKOUT_MAX_TREE_NODE(&left_root);
594 int new_left_height = compute_tree_hight(left_root);
595 TreeDbTerm * current_node = right_root;
596 this = &right_root;
597 current_height = right_height;
598 while (current_height > new_left_height + 1) {
599 if (current_node->balance == 1) {
600 current_height = current_height - 2;
601 } else {
602 current_height = current_height - 1;
603 }
604 dstack[dpos++] = DIR_LEFT;
605 tstack[tpos++] = this;
606 this = &((*this)->left);
607 current_node = current_node->left;
608 }
609 new_root->right = current_node;
610 new_root->left = left_root;
611 new_root->balance = current_height - new_left_height;
612 *this = new_root;
613 }
614 /* Now we need to continue as if this was during the insert */
615 while (state && ( dir = dstack[--dpos] ) != DIR_END) {
616 this = tstack[--tpos];
617 p = *this;
618 if (dir == DIR_LEFT) {
619 switch (p->balance) {
620 case 1:
621 p->balance = 0;
622 state = 0;
623 break;
624 case 0:
625 p->balance = -1;
626 break;
627 case -1: /* The icky case */
628 p1 = p->left;
629 if (p1->balance == -1) { /* Single LL rotation */
630 p->left = p1->right;
631 p1->right = p;
632 p->balance = 0;
633 (*this) = p1;
634 } else { /* Double RR rotation */
635 ASSERT(p1->right);
636 p2 = p1->right;
637 p1->right = p2->left;
638 p2->left = p1;
639 p->left = p2->right;
640 p2->right = p;
641 p->balance = (p2->balance == -1) ? +1 : 0;
642 p1->balance = (p2->balance == 1) ? -1 : 0;
643 (*this) = p2;
644 }
645 (*this)->balance = 0;
646 state = 0;
647 break;
648 }
649 } else { /* dir == DIR_RIGHT */
650 switch (p->balance) {
651 case -1:
652 p->balance = 0;
653 state = 0;
654 break;
655 case 0:
656 p->balance = 1;
657 break;
658 case 1:
659 p1 = p->right;
660 if (p1->balance == 1) { /* Single RR rotation */
661 p->right = p1->left;
662 p1->left = p;
663 p->balance = 0;
664 (*this) = p1;
665 } else { /* Double RL rotation */
666 ASSERT(p1->left);
667 p2 = p1->left;
668 p1->left = p2->right;
669 p2->right = p1;
670 p->right = p2->left;
671 p2->left = p;
672 p->balance = (p2->balance == 1) ? -1 : 0;
673 p1->balance = (p2->balance == -1) ? 1 : 0;
674 (*this) = p2;
675 }
676 (*this)->balance = 0;
677 state = 0;
678 break;
679 }
680 }
681 }
682 /* Return the joined tree */
683 if (left_height >= right_height) {
684 return left_root;
685 } else {
686 return right_root;
687 }
688 }
689
690 #ifdef DEBUG
691 # define PROVOKE_RANDOM_SPLIT_JOIN
692 #endif
693 #ifdef PROVOKE_RANDOM_SPLIT_JOIN
dbg_fastrand(void)694 static int dbg_fastrand(void)
695 {
696 static int g_seed = 648835;
697 g_seed = (214013*g_seed+2531011);
698 return (g_seed>>16)&0x7FFF;
699 }
700
dbg_provoke_random_splitjoin(DbTableCATree * tb,DbTableCATreeNode * base_node)701 static void dbg_provoke_random_splitjoin(DbTableCATree* tb,
702 DbTableCATreeNode* base_node)
703 {
704 if (tb->common.status & DB_CATREE_FORCE_SPLIT ||
705 !(tb->common.status & DB_CATREE_DEBUG_RANDOM_SPLIT_JOIN))
706 return;
707
708 switch (dbg_fastrand() % 8) {
709 case 1:
710 BASE_NODE_STAT_ADD(base_node, 1+ERL_DB_CATREE_HIGH_CONTENTION_LIMIT);
711 break;
712 case 2:
713 BASE_NODE_STAT_SUB(base_node, -1+ERL_DB_CATREE_LOW_CONTENTION_LIMIT);
714 break;
715 }
716 }
717 #else
718 # define dbg_provoke_random_splitjoin(T,N)
719 #endif /* PROVOKE_RANDOM_SPLIT_JOIN */
720
721 static ERTS_NOINLINE
do_random_join(DbTableCATree * tb,Uint rand)722 void do_random_join(DbTableCATree* tb, Uint rand)
723 {
724 DbTableCATreeNode* node = GET_ROOT_ACQB(tb);
725 DbTableCATreeNode* parent = NULL;
726 int level = 0;
727 Sint stat;
728 while (!node->is_base_node) {
729 parent = node;
730 if ((rand & (1 << level)) == 0) {
731 node = GET_LEFT_ACQB(node);
732 } else {
733 node = GET_RIGHT_ACQB(node);
734 }
735 level++;
736 }
737 BASE_NODE_STAT_SUB(node, ERL_DB_CATREE_LOCK_GRAVITY_CONTRIBUTION);
738 stat = BASE_NODE_STAT_READ(node);
739 if (stat >= ERL_DB_CATREE_LOW_CONTENTION_LIMIT &&
740 stat <= ERL_DB_CATREE_HIGH_CONTENTION_LIMIT) {
741 return; /* No adaptation */
742 }
743 if (parent != NULL && !try_wlock_base_node(&node->u.base)) {
744 if (!node->u.base.is_valid) {
745 wunlock_base_node(node);
746 return;
747 }
748 wunlock_adapt_base_node(tb, node, parent, level);
749 }
750 }
751
752 static ERTS_INLINE
do_random_join_with_low_probability(DbTableCATree * tb,Uint seed)753 void do_random_join_with_low_probability(DbTableCATree* tb, Uint seed)
754 {
755 #ifndef ERTS_DB_CA_TREE_NO_RANDOM_JOIN_WITH_LOW_PROBABILITY
756 Uint32 rand = erts_sched_local_random(seed);
757 if (((rand & ERL_DB_CATREE_LOCK_GRAVITY_PATTERN)) == 0) {
758 do_random_join(tb, rand);
759 }
760 #endif
761 }
762
763 static ERTS_INLINE
try_wlock_base_node(DbTableCATreeBaseNode * base_node)764 int try_wlock_base_node(DbTableCATreeBaseNode *base_node)
765 {
766 return EBUSY == erts_rwmtx_tryrwlock(&base_node->lock);
767 }
768
769 /*
770 * Locks a base node without adjusting the lock statistics
771 */
772 static ERTS_INLINE
wlock_base_node_no_stats(DbTableCATreeNode * base_node)773 void wlock_base_node_no_stats(DbTableCATreeNode *base_node)
774 {
775 ASSERT(base_node->is_base_node);
776 erts_rwmtx_rwlock(&base_node->u.base.lock);
777 }
778
779 /*
780 * Locks a base node and adjusts the lock statistics according to if
781 * the lock was contended or not
782 */
783 static ERTS_INLINE
wlock_base_node(DbTableCATreeNode * base_node)784 void wlock_base_node(DbTableCATreeNode *base_node)
785 {
786 ASSERT(base_node->is_base_node);
787 if (try_wlock_base_node(&base_node->u.base)) {
788 /* The lock is contended */
789 wlock_base_node_no_stats(base_node);
790 BASE_NODE_STAT_ADD(base_node, ERL_DB_CATREE_LOCK_FAILURE_CONTRIBUTION);
791 } else {
792 BASE_NODE_STAT_SUB(base_node, ERL_DB_CATREE_LOCK_SUCCESS_CONTRIBUTION);
793 }
794 }
795
796 static ERTS_INLINE
wunlock_base_node(DbTableCATreeNode * base_node)797 void wunlock_base_node(DbTableCATreeNode *base_node)
798 {
799 erts_rwmtx_rwunlock(&base_node->u.base.lock);
800 }
801
802 static ERTS_INLINE
wunlock_adapt_base_node(DbTableCATree * tb,DbTableCATreeNode * node,DbTableCATreeNode * parent,int current_level)803 void wunlock_adapt_base_node(DbTableCATree* tb,
804 DbTableCATreeNode* node,
805 DbTableCATreeNode* parent,
806 int current_level)
807 {
808 Sint base_node_lock_stat = BASE_NODE_STAT_READ(node);
809 dbg_provoke_random_splitjoin(tb,node);
810 if ((!node->u.base.root && parent && !(tb->common.status
811 & DB_CATREE_FORCE_SPLIT))
812 || base_node_lock_stat < ERL_DB_CATREE_LOW_CONTENTION_LIMIT) {
813 join_catree(tb, node, parent);
814 }
815 else if (base_node_lock_stat > ERL_DB_CATREE_HIGH_CONTENTION_LIMIT
816 && current_level < ERL_DB_CATREE_MAX_ROUTE_NODE_LAYER_HEIGHT) {
817 split_catree(tb, node, parent);
818 }
819 else {
820 wunlock_base_node(node);
821 }
822 }
823
824 static ERTS_INLINE
rlock_base_node(DbTableCATreeNode * base_node)825 void rlock_base_node(DbTableCATreeNode *base_node)
826 {
827 ASSERT(base_node->is_base_node);
828 if (EBUSY == erts_rwmtx_tryrlock(&base_node->u.base.lock)) {
829 /* The lock is contended */
830 BASE_NODE_STAT_ADD(base_node, ERL_DB_CATREE_LOCK_FAILURE_CONTRIBUTION);
831 erts_rwmtx_rlock(&base_node->u.base.lock);
832 }
833 }
834
835 static ERTS_INLINE
runlock_base_node(DbTableCATreeNode * base_node,DbTableCATree * tb)836 void runlock_base_node(DbTableCATreeNode *base_node, DbTableCATree* tb)
837 {
838 ASSERT(base_node->is_base_node);
839 erts_rwmtx_runlock(&base_node->u.base.lock);
840 do_random_join_with_low_probability(tb, (Uint)base_node);
841 }
842
843 static ERTS_INLINE
runlock_base_node_no_rand(DbTableCATreeNode * base_node)844 void runlock_base_node_no_rand(DbTableCATreeNode *base_node)
845 {
846 ASSERT(base_node->is_base_node);
847 erts_rwmtx_runlock(&base_node->u.base.lock);
848 }
849
850 static ERTS_INLINE
lock_route_node(DbTableCATreeNode * route_node)851 void lock_route_node(DbTableCATreeNode *route_node)
852 {
853 ASSERT(!route_node->is_base_node);
854 erts_mtx_lock(&route_node->u.route.lock);
855 }
856
857 static ERTS_INLINE
unlock_route_node(DbTableCATreeNode * route_node)858 void unlock_route_node(DbTableCATreeNode *route_node)
859 {
860 ASSERT(!route_node->is_base_node);
861 erts_mtx_unlock(&route_node->u.route.lock);
862 }
863
864 static ERTS_INLINE
copy_route_key(DbRouteKey * dst,Eterm key,Uint key_size)865 Eterm copy_route_key(DbRouteKey* dst, Eterm key, Uint key_size)
866 {
867 dst->size = key_size;
868 if (key_size != 0) {
869 Eterm* hp = &dst->heap[0];
870 ErlOffHeap tmp_offheap;
871 tmp_offheap.first = NULL;
872 dst->term = copy_struct(key, key_size, &hp, &tmp_offheap);
873 dst->oh = tmp_offheap.first;
874 }
875 else {
876 ASSERT(is_immed(key));
877 dst->term = key;
878 dst->oh = NULL;
879 }
880 return dst->term;
881 }
882
883 static ERTS_INLINE
destroy_route_key(DbRouteKey * key)884 void destroy_route_key(DbRouteKey* key)
885 {
886 if (key->oh) {
887 ErlOffHeap oh;
888 oh.first = key->oh;
889 erts_cleanup_offheap(&oh);
890 }
891 }
892
893 static ERTS_INLINE
init_root_iterator(DbTableCATree * tb,CATreeRootIterator * iter,int read_only)894 void init_root_iterator(DbTableCATree* tb, CATreeRootIterator* iter,
895 int read_only)
896 {
897 iter->tb = tb;
898 iter->read_only = read_only;
899 iter->locked_bnode = NULL;
900 iter->next_route_key = THE_NON_VALUE;
901 iter->search_key = NULL;
902 }
903
904 static ERTS_INLINE
lock_iter_base_node(CATreeRootIterator * iter,DbTableCATreeNode * base_node,DbTableCATreeNode * parent,int current_level)905 void lock_iter_base_node(CATreeRootIterator* iter,
906 DbTableCATreeNode *base_node,
907 DbTableCATreeNode *parent,
908 int current_level)
909 {
910 ASSERT(!iter->locked_bnode);
911 if (iter->read_only)
912 rlock_base_node(base_node);
913 else {
914 wlock_base_node(base_node);
915 iter->bnode_parent = parent;
916 iter->bnode_level = current_level;
917 }
918 iter->locked_bnode = base_node;
919 }
920
921 static ERTS_INLINE
unlock_iter_base_node(CATreeRootIterator * iter)922 void unlock_iter_base_node(CATreeRootIterator* iter)
923 {
924 ASSERT(iter->locked_bnode);
925 if (iter->read_only)
926 runlock_base_node(iter->locked_bnode, iter->tb);
927 else if (iter->locked_bnode->u.base.is_valid) {
928 wunlock_adapt_base_node(iter->tb, iter->locked_bnode,
929 iter->bnode_parent, iter->bnode_level);
930 }
931 else
932 wunlock_base_node(iter->locked_bnode);
933 iter->locked_bnode = NULL;
934 }
935
936 static ERTS_INLINE
destroy_root_iterator(CATreeRootIterator * iter)937 void destroy_root_iterator(CATreeRootIterator* iter)
938 {
939 if (iter->locked_bnode)
940 unlock_iter_base_node(iter);
941 if (iter->search_key) {
942 destroy_route_key(iter->search_key);
943 erts_free(ERTS_ALC_T_DB_TMP, iter->search_key);
944 }
945 }
946
947 typedef struct
948 {
949 DbTableCATreeNode *parent;
950 int current_level;
951 } FindBaseNode;
952
953 static ERTS_INLINE
find_base_node(DbTableCATree * tb,Eterm key,FindBaseNode * fbn)954 DbTableCATreeNode* find_base_node(DbTableCATree* tb, Eterm key,
955 FindBaseNode* fbn)
956 {
957 DbTableCATreeNode* ERTS_RESTRICT node = GET_ROOT_ACQB(tb);
958 if (fbn) {
959 fbn->parent = NULL;
960 fbn->current_level = 0;
961 }
962 while (!node->is_base_node) {
963 if (fbn) {
964 fbn->current_level++;
965 fbn->parent = node;
966 }
967 if (cmp_key_route(key, node) < 0) {
968 node = GET_LEFT_ACQB(node);
969 } else {
970 node = GET_RIGHT_ACQB(node);
971 }
972 }
973 return node;
974 }
975
976 static ERTS_INLINE
find_rlock_valid_base_node(DbTableCATree * tb,Eterm key)977 DbTableCATreeNode* find_rlock_valid_base_node(DbTableCATree* tb, Eterm key)
978 {
979 DbTableCATreeNode* base_node;
980
981 while (1) {
982 base_node = find_base_node(tb, key, NULL);
983 rlock_base_node(base_node);
984 if (base_node->u.base.is_valid)
985 break;
986 runlock_base_node_no_rand(base_node);
987 }
988 return base_node;
989 }
990
991 static ERTS_INLINE
find_wlock_valid_base_node(DbTableCATree * tb,Eterm key,FindBaseNode * fbn)992 DbTableCATreeNode* find_wlock_valid_base_node(DbTableCATree* tb, Eterm key,
993 FindBaseNode* fbn)
994 {
995 DbTableCATreeNode* base_node;
996
997 while (1) {
998 base_node = find_base_node(tb, key, fbn);
999 wlock_base_node(base_node);
1000 if (base_node->u.base.is_valid)
1001 break;
1002 wunlock_base_node(base_node);
1003 }
1004 return base_node;
1005 }
1006
1007 #ifdef ERTS_ENABLE_LOCK_CHECK
1008 # define LC_ORDER(ORDER) ORDER
1009 #else
1010 # define LC_ORDER(ORDER) NIL
1011 #endif
1012
1013 #define sizeof_base_node() \
1014 offsetof(DbTableCATreeNode, u.base.end_of_struct__)
1015
create_base_node(DbTableCATree * tb,TreeDbTerm * root)1016 static DbTableCATreeNode *create_base_node(DbTableCATree *tb,
1017 TreeDbTerm* root)
1018 {
1019 DbTableCATreeNode *p;
1020 erts_rwmtx_opt_t rwmtx_opt = ERTS_RWMTX_OPT_DEFAULT_INITER;
1021 p = erts_db_alloc(ERTS_ALC_T_DB_TABLE, (DbTable *) tb,
1022 sizeof_base_node());
1023
1024 p->is_base_node = 1;
1025 p->u.base.root = root;
1026 if (tb->common.type & DB_FREQ_READ)
1027 rwmtx_opt.type = ERTS_RWMTX_TYPE_FREQUENT_READ;
1028 if (erts_ets_rwmtx_spin_count >= 0)
1029 rwmtx_opt.main_spincount = erts_ets_rwmtx_spin_count;
1030
1031 erts_rwmtx_init_opt(&p->u.base.lock, &rwmtx_opt,
1032 "erl_db_catree_base_node",
1033 NIL,
1034 ERTS_LOCK_FLAGS_CATEGORY_DB);
1035 BASE_NODE_STAT_SET(p, ((tb->common.status & DB_CATREE_FORCE_SPLIT)
1036 ? INT_MAX : 0));
1037 p->u.base.is_valid = 1;
1038 return p;
1039 }
1040
sizeof_route_node(Uint key_size)1041 static ERTS_INLINE Uint sizeof_route_node(Uint key_size)
1042 {
1043 return (offsetof(DbTableCATreeNode, u.route.key.heap)
1044 + key_size*sizeof(Eterm));
1045 }
1046
1047 static DbTableCATreeNode*
create_route_node(DbTableCATree * tb,DbTableCATreeNode * left,DbTableCATreeNode * right,DbTerm * keyTerm,DbTableCATreeNode * lc_parent)1048 create_route_node(DbTableCATree *tb,
1049 DbTableCATreeNode *left,
1050 DbTableCATreeNode *right,
1051 DbTerm * keyTerm,
1052 DbTableCATreeNode* lc_parent)
1053 {
1054 Eterm key = GETKEY(tb,keyTerm->tpl);
1055 int key_size = size_object(key);
1056 DbTableCATreeNode* p = erts_db_alloc(ERTS_ALC_T_DB_TABLE,
1057 (DbTable *) tb,
1058 sizeof_route_node(key_size));
1059
1060 copy_route_key(&p->u.route.key, key, key_size);
1061 p->is_base_node = 0;
1062 p->u.route.is_valid = 1;
1063 erts_atomic_init_nob(&p->u.route.left, (erts_aint_t)left);
1064 erts_atomic_init_nob(&p->u.route.right, (erts_aint_t)right);
1065 #ifdef ERTS_ENABLE_LOCK_CHECK
1066 /* Route node lock order is inverse tree depth (from leafs toward root) */
1067 p->u.route.lc_order = (lc_parent == NULL ? MAX_SMALL :
1068 lc_parent->u.route.lc_order - 1);
1069 /*
1070 * This assert may eventually fail as we don't increase 'lc_order' in join
1071 * operations when route nodes move up in the tree.
1072 * Tough luck if you run a lock-checking VM for such a long time on 32-bit.
1073 */
1074 ERTS_LC_ASSERT(p->u.route.lc_order >= 0);
1075 #endif
1076 erts_mtx_init(&p->u.route.lock, "erl_db_catree_route_node",
1077 LC_ORDER(make_small(p->u.route.lc_order)),
1078 ERTS_LOCK_FLAGS_CATEGORY_DB);
1079 return p;
1080 }
1081
do_free_base_node(void * vptr)1082 static void do_free_base_node(void* vptr)
1083 {
1084 DbTableCATreeNode *p = (DbTableCATreeNode *)vptr;
1085 ASSERT(p->is_base_node);
1086 erts_rwmtx_destroy(&p->u.base.lock);
1087 erts_free(ERTS_ALC_T_DB_TABLE, p);
1088 }
1089
free_catree_base_node(DbTableCATree * tb,DbTableCATreeNode * p)1090 static void free_catree_base_node(DbTableCATree* tb, DbTableCATreeNode* p)
1091 {
1092 ASSERT(p->is_base_node);
1093 ERTS_DB_ALC_MEM_UPDATE_(tb, sizeof_base_node(), 0);
1094 do_free_base_node(p);
1095 }
1096
do_free_route_node(void * vptr)1097 static void do_free_route_node(void *vptr)
1098 {
1099 DbTableCATreeNode *p = (DbTableCATreeNode *)vptr;
1100 ASSERT(!p->is_base_node);
1101 erts_mtx_destroy(&p->u.route.lock);
1102 destroy_route_key(&p->u.route.key);
1103 erts_free(ERTS_ALC_T_DB_TABLE, p);
1104 }
1105
free_catree_route_node(DbTableCATree * tb,DbTableCATreeNode * p)1106 static void free_catree_route_node(DbTableCATree* tb, DbTableCATreeNode* p)
1107 {
1108 ASSERT(!p->is_base_node);
1109 ERTS_DB_ALC_MEM_UPDATE_(tb, sizeof_route_node(p->u.route.key.size), 0);
1110 do_free_route_node(p);
1111 }
1112
1113
1114 /*
1115 * Returns the parent routing node of the specified
1116 * route node 'child' if such a parent exists
1117 * or NULL if 'child' is attached to the root.
1118 */
1119 static ERTS_INLINE DbTableCATreeNode *
parent_of(DbTableCATree * tb,DbTableCATreeNode * child)1120 parent_of(DbTableCATree *tb,
1121 DbTableCATreeNode *child)
1122 {
1123 Eterm key = GET_ROUTE_NODE_KEY(child);
1124 DbTableCATreeNode *current = GET_ROOT_ACQB(tb);
1125 DbTableCATreeNode *prev = NULL;
1126
1127 while (current != child) {
1128 prev = current;
1129 if (cmp_key_route(key, current) < 0) {
1130 current = GET_LEFT_ACQB(current);
1131 } else {
1132 current = GET_RIGHT_ACQB(current);
1133 }
1134 }
1135 return prev;
1136 }
1137
1138
1139 static ERTS_INLINE DbTableCATreeNode *
leftmost_base_node(DbTableCATreeNode * root)1140 leftmost_base_node(DbTableCATreeNode *root)
1141 {
1142 DbTableCATreeNode *node = root;
1143 while (!node->is_base_node) {
1144 node = GET_LEFT_ACQB(node);
1145 }
1146 return node;
1147 }
1148
1149
1150 static ERTS_INLINE DbTableCATreeNode *
rightmost_base_node(DbTableCATreeNode * root)1151 rightmost_base_node(DbTableCATreeNode *root)
1152 {
1153 DbTableCATreeNode *node = root;
1154 while (!node->is_base_node) {
1155 node = GET_RIGHT_ACQB(node);
1156 }
1157 return node;
1158 }
1159
1160
1161 static ERTS_INLINE DbTableCATreeNode *
leftmost_route_node(DbTableCATreeNode * root)1162 leftmost_route_node(DbTableCATreeNode *root)
1163 {
1164 DbTableCATreeNode *node = root;
1165 DbTableCATreeNode *prev_node = NULL;
1166 while (!node->is_base_node) {
1167 prev_node = node;
1168 node = GET_LEFT_ACQB(node);
1169 }
1170 return prev_node;
1171 }
1172
1173 static ERTS_INLINE DbTableCATreeNode*
rightmost_route_node(DbTableCATreeNode * root)1174 rightmost_route_node(DbTableCATreeNode *root)
1175 {
1176 DbTableCATreeNode * node = root;
1177 DbTableCATreeNode * prev_node = NULL;
1178 while (!node->is_base_node) {
1179 prev_node = node;
1180 node = GET_RIGHT_ACQB(node);
1181 }
1182 return prev_node;
1183 }
1184
1185 static ERTS_INLINE
init_tree_stack(DbTreeStack * stack,TreeDbTerm ** stack_array,Uint init_slot)1186 void init_tree_stack(DbTreeStack *stack,
1187 TreeDbTerm **stack_array,
1188 Uint init_slot)
1189 {
1190 stack->array = stack_array;
1191 stack->pos = 0;
1192 stack->slot = init_slot;
1193 }
1194
join_catree(DbTableCATree * tb,DbTableCATreeNode * thiz,DbTableCATreeNode * parent)1195 static void join_catree(DbTableCATree *tb,
1196 DbTableCATreeNode *thiz,
1197 DbTableCATreeNode *parent)
1198 {
1199 DbTableCATreeNode *gparent;
1200 DbTableCATreeNode *neighbor;
1201 DbTableCATreeNode *new_neighbor;
1202 DbTableCATreeNode *neighbor_parent;
1203
1204 ASSERT(thiz->is_base_node);
1205 if (parent == NULL) {
1206 BASE_NODE_STAT_SET(thiz, 0);
1207 wunlock_base_node(thiz);
1208 return;
1209 }
1210 ASSERT(!parent->is_base_node);
1211 if (GET_LEFT(parent) == thiz) {
1212 neighbor = leftmost_base_node(GET_RIGHT_ACQB(parent));
1213 if (try_wlock_base_node(&neighbor->u.base)) {
1214 /* Failed to acquire lock */
1215 BASE_NODE_STAT_SET(thiz, 0);
1216 wunlock_base_node(thiz);
1217 return;
1218 } else if (!neighbor->u.base.is_valid) {
1219 BASE_NODE_STAT_SET(thiz, 0);
1220 wunlock_base_node(thiz);
1221 wunlock_base_node(neighbor);
1222 return;
1223 } else {
1224 lock_route_node(parent);
1225 parent->u.route.is_valid = 0;
1226 neighbor->u.base.is_valid = 0;
1227 thiz->u.base.is_valid = 0;
1228 gparent = NULL;
1229 do {
1230 if (gparent != NULL) {
1231 unlock_route_node(gparent);
1232 }
1233 gparent = parent_of(tb, parent);
1234 if (gparent != NULL)
1235 lock_route_node(gparent);
1236 } while (gparent != NULL && !gparent->u.route.is_valid);
1237
1238 if (gparent == NULL) {
1239 SET_ROOT_RELB(tb, GET_RIGHT(parent));
1240 } else if (GET_LEFT(gparent) == parent) {
1241 SET_LEFT_RELB(gparent, GET_RIGHT(parent));
1242 } else {
1243 SET_RIGHT_RELB(gparent, GET_RIGHT(parent));
1244 }
1245 unlock_route_node(parent);
1246 if (gparent != NULL) {
1247 unlock_route_node(gparent);
1248 }
1249 {
1250 TreeDbTerm* new_root = join_trees(thiz->u.base.root,
1251 neighbor->u.base.root);
1252 new_neighbor = create_base_node(tb, new_root);
1253 }
1254 if (GET_RIGHT(parent) == neighbor) {
1255 neighbor_parent = gparent;
1256 } else {
1257 neighbor_parent = leftmost_route_node(GET_RIGHT(parent));
1258 }
1259 }
1260 } else { /* Symetric case */
1261 ASSERT(GET_RIGHT(parent) == thiz);
1262 neighbor = rightmost_base_node(GET_LEFT_ACQB(parent));
1263 if (try_wlock_base_node(&neighbor->u.base)) {
1264 /* Failed to acquire lock */
1265 BASE_NODE_STAT_SET(thiz, 0);
1266 wunlock_base_node(thiz);
1267 return;
1268 } else if (!neighbor->u.base.is_valid) {
1269 BASE_NODE_STAT_SET(thiz, 0);
1270 wunlock_base_node(thiz);
1271 wunlock_base_node(neighbor);
1272 return;
1273 } else {
1274 lock_route_node(parent);
1275 parent->u.route.is_valid = 0;
1276 neighbor->u.base.is_valid = 0;
1277 thiz->u.base.is_valid = 0;
1278 gparent = NULL;
1279 do {
1280 if (gparent != NULL) {
1281 unlock_route_node(gparent);
1282 }
1283 gparent = parent_of(tb, parent);
1284 if (gparent != NULL) {
1285 lock_route_node(gparent);
1286 } else {
1287 gparent = NULL;
1288 }
1289 } while (gparent != NULL && !gparent->u.route.is_valid);
1290 if (gparent == NULL) {
1291 SET_ROOT_RELB(tb, GET_LEFT(parent));
1292 } else if (GET_RIGHT(gparent) == parent) {
1293 SET_RIGHT_RELB(gparent, GET_LEFT(parent));
1294 } else {
1295 SET_LEFT_RELB(gparent, GET_LEFT(parent));
1296 }
1297 unlock_route_node(parent);
1298 if (gparent != NULL) {
1299 unlock_route_node(gparent);
1300 }
1301 {
1302 TreeDbTerm* new_root = join_trees(neighbor->u.base.root,
1303 thiz->u.base.root);
1304 new_neighbor = create_base_node(tb, new_root);
1305 }
1306 if (GET_LEFT(parent) == neighbor) {
1307 neighbor_parent = gparent;
1308 } else {
1309 neighbor_parent =
1310 rightmost_route_node(GET_LEFT(parent));
1311 }
1312 }
1313 }
1314 /* Link in new neighbor and free nodes that are no longer in the tree */
1315 if (neighbor_parent == NULL) {
1316 SET_ROOT_RELB(tb, new_neighbor);
1317 } else if (GET_LEFT(neighbor_parent) == neighbor) {
1318 SET_LEFT_RELB(neighbor_parent, new_neighbor);
1319 } else {
1320 SET_RIGHT_RELB(neighbor_parent, new_neighbor);
1321 }
1322 wunlock_base_node(thiz);
1323 wunlock_base_node(neighbor);
1324 /* Free the parent and base */
1325 erts_schedule_db_free(&tb->common,
1326 do_free_route_node,
1327 parent,
1328 &parent->u.route.free_item,
1329 sizeof_route_node(parent->u.route.key.size));
1330 erts_schedule_db_free(&tb->common,
1331 do_free_base_node,
1332 thiz,
1333 &thiz->u.base.free_item,
1334 sizeof_base_node());
1335 erts_schedule_db_free(&tb->common,
1336 do_free_base_node,
1337 neighbor,
1338 &neighbor->u.base.free_item,
1339 sizeof_base_node());
1340 }
1341
split_catree(DbTableCATree * tb,DbTableCATreeNode * ERTS_RESTRICT base,DbTableCATreeNode * ERTS_RESTRICT parent)1342 static void split_catree(DbTableCATree *tb,
1343 DbTableCATreeNode* ERTS_RESTRICT base,
1344 DbTableCATreeNode* ERTS_RESTRICT parent)
1345 {
1346 TreeDbTerm *splitOutWriteBack;
1347 DbTableCATreeNode* ERTS_RESTRICT new_left;
1348 DbTableCATreeNode* ERTS_RESTRICT new_right;
1349 DbTableCATreeNode* ERTS_RESTRICT new_route;
1350
1351 if (less_than_two_elements(base->u.base.root)) {
1352 if (!(tb->common.status & DB_CATREE_FORCE_SPLIT))
1353 BASE_NODE_STAT_SET(base, 0);
1354 wunlock_base_node(base);
1355 return;
1356 } else {
1357 TreeDbTerm *left_tree;
1358 TreeDbTerm *right_tree;
1359
1360 split_tree(tb, base->u.base.root, &splitOutWriteBack,
1361 &left_tree, &right_tree);
1362
1363 new_left = create_base_node(tb, left_tree);
1364 new_right = create_base_node(tb, right_tree);
1365 new_route = create_route_node(tb,
1366 new_left,
1367 new_right,
1368 &splitOutWriteBack->dbterm,
1369 parent);
1370 if (parent == NULL) {
1371 SET_ROOT_RELB(tb, new_route);
1372 } else if(GET_LEFT(parent) == base) {
1373 SET_LEFT_RELB(parent, new_route);
1374 } else {
1375 SET_RIGHT_RELB(parent, new_route);
1376 }
1377 base->u.base.is_valid = 0;
1378 wunlock_base_node(base);
1379 erts_schedule_db_free(&tb->common,
1380 do_free_base_node,
1381 base,
1382 &base->u.base.free_item,
1383 sizeof_base_node());
1384 }
1385 }
1386
1387 /** @brief Free the entire catree and its sub-trees.
1388 *
1389 * @param reds Reductions to spend.
1390 * @return Reductions left. Negative value if not done.
1391 */
db_free_table_continue_catree(DbTable * tbl,SWord reds)1392 static SWord db_free_table_continue_catree(DbTable *tbl, SWord reds)
1393 {
1394 DbTableCATree *tb = &tbl->catree;
1395 DbTableCATreeNode *node;
1396 DbTableCATreeNode *parent;
1397 CATreeNodeStack rnode_stack;
1398 DbTableCATreeNode *rnode_stack_array[STACK_NEED];
1399
1400 if (!tb->deletion) {
1401 /* First call */
1402 tb->deletion = 1;
1403 tb->nr_of_deleted_items = 0;
1404 }
1405
1406 /*
1407 * The route tree is traversed and freed while keeping it consistent
1408 * during yields.
1409 */
1410 rnode_stack.array = rnode_stack_array;
1411 rnode_stack.pos = 0;
1412 rnode_stack.size = STACK_NEED;
1413
1414 node = GET_ROOT(tb);
1415 if (node->is_base_node) {
1416 if (node->u.base.root) {
1417 reds = do_delete_base_node_cont(tb, node, reds);
1418 if (reds < 0)
1419 return reds; /* Yield */
1420 }
1421 free_catree_base_node(tb, node);
1422 }
1423 else {
1424 for (;;) {
1425 DbTableCATreeNode* left = GET_LEFT(node);
1426 DbTableCATreeNode* right = GET_RIGHT(node);
1427
1428 if (!left->is_base_node) {
1429 PUSH_NODE(&rnode_stack, node);
1430 node = left;
1431 }
1432 else if (!right->is_base_node) {
1433 PUSH_NODE(&rnode_stack, node);
1434 node = right;
1435 }
1436 else {
1437 if (left->u.base.root) {
1438 reds = do_delete_base_node_cont(tb, left, reds);
1439 if (reds < 0)
1440 return reds; /* Yield */
1441 }
1442 if (right->u.base.root) {
1443 reds = do_delete_base_node_cont(tb, right, reds);
1444 if (reds < 0)
1445 return reds; /* Yield */
1446 }
1447
1448 free_catree_base_node(tb, right);
1449 free_catree_route_node(tb, node);
1450 /*
1451 * Keep empty left base node to join with its grandparent
1452 * for tree consistency during yields.
1453 */
1454
1455 parent = POP_NODE(&rnode_stack);
1456 if (parent) {
1457 if (node == GET_LEFT(parent)) {
1458 SET_LEFT(parent, left);
1459 }
1460 else {
1461 ASSERT(node == GET_RIGHT(parent));
1462 SET_RIGHT(parent, left);
1463 }
1464
1465 reds -= 2;
1466 if (reds < 0)
1467 return reds; /* Yield */
1468
1469 node = parent;
1470 }
1471 else { /* Done */
1472 free_catree_base_node(tb, left);
1473 break;
1474 }
1475 }
1476 }
1477 }
1478
1479 ASSERT(reds >= 0);
1480 SET_ROOT(tb, NULL);
1481 return reds;
1482 }
1483
1484 /** @brief Free all objects of a base node, but keep the base node.
1485 *
1486 * @param reds Reductions to spend.
1487 * @return Reductions left. Negative value if not done.
1488 */
do_delete_base_node_cont(DbTableCATree * tb,DbTableCATreeNode * base_node,SWord reds)1489 static SWord do_delete_base_node_cont(DbTableCATree *tb,
1490 DbTableCATreeNode *base_node,
1491 SWord reds)
1492 {
1493 TreeDbTerm *p;
1494 DbTreeStack stack;
1495 TreeDbTerm* stack_array[STACK_NEED];
1496
1497 stack.pos = 0;
1498 stack.array = stack_array;
1499
1500 p = base_node->u.base.root;
1501 for (;;) {
1502 if (p->left) {
1503 PUSH_NODE(&stack, p);
1504 p = p->left;
1505 }
1506 else if (p->right) {
1507 PUSH_NODE(&stack, p);
1508 p = p->right;
1509 }
1510 else {
1511 TreeDbTerm *parent;
1512
1513 DEC_NITEMS((DbTable*)tb);
1514 tb->nr_of_deleted_items++;
1515 free_term((DbTable*)tb, p);
1516
1517 parent = POP_NODE(&stack);
1518 if (!parent)
1519 break;
1520 if (parent->left == p)
1521 parent->left = NULL;
1522 else {
1523 ASSERT(parent->right == p);
1524 parent->right = NULL;
1525 }
1526 if (--reds < 0)
1527 return reds; /* Yield */
1528 p = parent;
1529 }
1530 }
1531 base_node->u.base.root = NULL;
1532 return reds;
1533 }
1534
1535
1536 /*
1537 ** Initialization function
1538 */
1539
db_initialize_catree(void)1540 void db_initialize_catree(void)
1541 {
1542 return;
1543 };
1544
1545 /*
1546 ** Table interface routines (i.e., what's called by the bif's)
1547 */
1548
db_create_catree(Process * p,DbTable * tbl)1549 int db_create_catree(Process *p, DbTable *tbl)
1550 {
1551 DbTableCATree *tb = &tbl->catree;
1552 DbTableCATreeNode *root;
1553
1554 root = create_base_node(tb, NULL);
1555 tb->deletion = 0;
1556 tb->nr_of_deleted_items = 0;
1557 #ifdef DEBUG
1558 tbl->common.status |= DB_CATREE_DEBUG_RANDOM_SPLIT_JOIN;
1559 #endif
1560 erts_atomic_init_relb(&(tb->root), (erts_aint_t)root);
1561 return DB_ERROR_NONE;
1562 }
1563
db_first_catree(Process * p,DbTable * tbl,Eterm * ret)1564 static int db_first_catree(Process *p, DbTable *tbl, Eterm *ret)
1565 {
1566 TreeDbTerm *root;
1567 CATreeRootIterator iter;
1568 int result;
1569
1570 init_root_iterator(&tbl->catree, &iter, 1);
1571 root = *catree_find_first_root(&iter);
1572 if (!root) {
1573 TreeDbTerm **pp = catree_find_next_root(&iter, NULL);
1574 root = pp ? *pp : NULL;
1575 }
1576
1577 result = db_first_tree_common(p, tbl, root, ret, NULL);
1578
1579 destroy_root_iterator(&iter);
1580 return result;
1581 }
1582
db_next_catree(Process * p,DbTable * tbl,Eterm key,Eterm * ret)1583 static int db_next_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
1584 {
1585 DbTreeStack stack;
1586 TreeDbTerm * stack_array[STACK_NEED];
1587 TreeDbTerm **rootp;
1588 CATreeRootIterator iter;
1589 int result;
1590
1591 init_root_iterator(&tbl->catree, &iter, 1);
1592 iter.next_route_key = key;
1593 rootp = catree_find_next_root(&iter, NULL);
1594
1595 do {
1596 init_tree_stack(&stack, stack_array, 0);
1597 result = db_next_tree_common(p, tbl, (rootp ? *rootp : NULL), key, ret, &stack);
1598 if (result != DB_ERROR_NONE || *ret != am_EOT)
1599 break;
1600
1601 rootp = catree_find_next_root(&iter, NULL);
1602 } while (rootp);
1603
1604 destroy_root_iterator(&iter);
1605 return result;
1606 }
1607
db_last_catree(Process * p,DbTable * tbl,Eterm * ret)1608 static int db_last_catree(Process *p, DbTable *tbl, Eterm *ret)
1609 {
1610 TreeDbTerm *root;
1611 CATreeRootIterator iter;
1612 int result;
1613
1614 init_root_iterator(&tbl->catree, &iter, 1);
1615 root = *catree_find_last_root(&iter);
1616 if (!root) {
1617 TreeDbTerm **pp = catree_find_prev_root(&iter, NULL);
1618 root = pp ? *pp : NULL;
1619 }
1620
1621 result = db_last_tree_common(p, tbl, root, ret, NULL);
1622
1623 destroy_root_iterator(&iter);
1624 return result;
1625 }
1626
db_prev_catree(Process * p,DbTable * tbl,Eterm key,Eterm * ret)1627 static int db_prev_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
1628 {
1629 DbTreeStack stack;
1630 TreeDbTerm * stack_array[STACK_NEED];
1631 TreeDbTerm **rootp;
1632 CATreeRootIterator iter;
1633 int result;
1634
1635 init_root_iterator(&tbl->catree, &iter, 1);
1636 iter.next_route_key = key;
1637 rootp = catree_find_prev_root(&iter, NULL);
1638
1639 do {
1640 init_tree_stack(&stack, stack_array, 0);
1641 result = db_prev_tree_common(p, tbl, (rootp ? *rootp : NULL), key, ret,
1642 &stack);
1643 if (result != DB_ERROR_NONE || *ret != am_EOT)
1644 break;
1645 rootp = catree_find_prev_root(&iter, NULL);
1646 } while (rootp);
1647
1648 destroy_root_iterator(&iter);
1649 return result;
1650 }
1651
db_put_dbterm_catree(DbTable * tbl,void * obj,int key_clash_fail,SWord * consumed_reds_p)1652 static int db_put_dbterm_catree(DbTable* tbl,
1653 void* obj,
1654 int key_clash_fail,
1655 SWord *consumed_reds_p)
1656 {
1657 TreeDbTerm *value_to_insert = obj;
1658 DbTableCATree *tb = &tbl->catree;
1659 Eterm key = GETKEY(tb, value_to_insert->dbterm.tpl);
1660 FindBaseNode fbn;
1661 DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn);
1662 int result = db_put_dbterm_tree_common(&tb->common,
1663 &node->u.base.root,
1664 value_to_insert,
1665 key_clash_fail,
1666 NULL);
1667 wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level);
1668 return result;
1669 }
1670
db_put_catree(DbTable * tbl,Eterm obj,int key_clash_fail,SWord * consumed_reds_p)1671 static int db_put_catree(DbTable *tbl, Eterm obj, int key_clash_fail,
1672 SWord *consumed_reds_p)
1673 {
1674 DbTableCATree *tb = &tbl->catree;
1675 Eterm key = GETKEY(&tb->common, tuple_val(obj));
1676 FindBaseNode fbn;
1677 DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn);
1678 int result = db_put_tree_common(&tb->common, &node->u.base.root, obj,
1679 key_clash_fail, NULL);
1680 wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level);
1681 return result;
1682 }
1683
db_get_catree(Process * p,DbTable * tbl,Eterm key,Eterm * ret)1684 static int db_get_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
1685 {
1686 DbTableCATree *tb = &tbl->catree;
1687 DbTableCATreeNode* node = find_rlock_valid_base_node(tb, key);
1688 int result = db_get_tree_common(p, &tb->common,
1689 node->u.base.root,
1690 key, ret, NULL);
1691 runlock_base_node(node, tb);
1692 return result;
1693 }
1694
catree_find_root(Eterm key,CATreeRootIterator * iter)1695 TreeDbTerm** catree_find_root(Eterm key, CATreeRootIterator* iter)
1696 {
1697 FindBaseNode fbn;
1698 DbTableCATreeNode* base_node;
1699
1700 while (1) {
1701 base_node = find_base_node(iter->tb, key, &fbn);
1702 lock_iter_base_node(iter, base_node, fbn.parent, fbn.current_level);
1703 if (base_node->u.base.is_valid)
1704 break;
1705 unlock_iter_base_node(iter);
1706 }
1707 return &base_node->u.base.root;
1708 }
1709
save_iter_search_key(CATreeRootIterator * iter,Eterm key)1710 static Eterm save_iter_search_key(CATreeRootIterator* iter, Eterm key)
1711 {
1712 Uint key_size;
1713
1714 if (is_immed(key))
1715 return key;
1716
1717 if (iter->search_key) {
1718 if (key == iter->search_key->term)
1719 return key; /* already saved */
1720 destroy_route_key(iter->search_key);
1721 }
1722 key_size = size_object(key);
1723 if (!iter->search_key || key_size > iter->search_key->size) {
1724 iter->search_key = erts_realloc(ERTS_ALC_T_DB_TMP,
1725 iter->search_key,
1726 (offsetof(DbRouteKey, heap)
1727 + key_size*sizeof(Eterm)));
1728 }
1729 return copy_route_key(iter->search_key, key, key_size);
1730 }
1731
catree_find_nextprev_root(CATreeRootIterator * iter,int forward,Eterm * search_keyp)1732 TreeDbTerm** catree_find_nextprev_root(CATreeRootIterator *iter,
1733 int forward,
1734 Eterm *search_keyp)
1735 {
1736 #ifdef DEBUG
1737 DbTableCATreeNode *rejected_invalid = NULL;
1738 DbTableCATreeNode *rejected_empty = NULL;
1739 #endif
1740 DbTableCATreeNode *node;
1741 DbTableCATreeNode *parent;
1742 DbTableCATreeNode* next_route_node;
1743 Eterm route_key = iter->next_route_key;
1744 int current_level;
1745
1746 if (iter->locked_bnode) {
1747 if (search_keyp)
1748 *search_keyp = save_iter_search_key(iter, *search_keyp);
1749 unlock_iter_base_node(iter);
1750 }
1751
1752 if (is_non_value(route_key))
1753 return NULL;
1754
1755 while (1) {
1756 node = GET_ROOT_ACQB(iter->tb);
1757 current_level = 0;
1758 parent = NULL;
1759 next_route_node = NULL;
1760 while (!node->is_base_node) {
1761 current_level++;
1762 parent = node;
1763 if (forward) {
1764 if (cmp_key_route(route_key,node) < 0) {
1765 next_route_node = node;
1766 node = GET_LEFT_ACQB(node);
1767 } else {
1768 node = GET_RIGHT_ACQB(node);
1769 }
1770 }
1771 else {
1772 if (cmp_key_route(route_key,node) > 0) {
1773 next_route_node = node;
1774 node = GET_RIGHT_ACQB(node);
1775 } else {
1776 node = GET_LEFT_ACQB(node);
1777 }
1778 }
1779 }
1780 ASSERT(node != rejected_invalid);
1781 lock_iter_base_node(iter, node, parent, current_level);
1782 if (node->u.base.is_valid) {
1783 ASSERT(node != rejected_empty);
1784 if (node->u.base.root) {
1785 iter->next_route_key = (next_route_node ?
1786 next_route_node->u.route.key.term :
1787 THE_NON_VALUE);
1788 iter->locked_bnode = node;
1789 return &node->u.base.root;
1790 }
1791 if (!next_route_node) {
1792 unlock_iter_base_node(iter);
1793 return NULL;
1794 }
1795 route_key = next_route_node->u.route.key.term;
1796 IF_DEBUG(rejected_empty = node);
1797 }
1798 else
1799 IF_DEBUG(rejected_invalid = node);
1800
1801 /* Retry */
1802 unlock_iter_base_node(iter);
1803 }
1804 }
1805
catree_find_next_root(CATreeRootIterator * iter,Eterm * keyp)1806 TreeDbTerm** catree_find_next_root(CATreeRootIterator *iter, Eterm* keyp)
1807 {
1808 return catree_find_nextprev_root(iter, 1, keyp);
1809 }
1810
catree_find_prev_root(CATreeRootIterator * iter,Eterm * keyp)1811 TreeDbTerm** catree_find_prev_root(CATreeRootIterator *iter, Eterm* keyp)
1812 {
1813 return catree_find_nextprev_root(iter, 0, keyp);
1814 }
1815
1816 /** @brief Find root of tree where object with smallest key of all larger than
1817 * partially bound key may reside. Can be used as a starting point for
1818 * a reverse iteration with pb_key.
1819 *
1820 * @param pb_key The partially bound key. Example {42, '$1'}
1821 * @param iter An initialized root iterator.
1822 *
1823 * @return Pointer to found root pointer. May not be NULL.
1824 */
catree_find_next_from_pb_key_root(Eterm pb_key,CATreeRootIterator * iter)1825 TreeDbTerm** catree_find_next_from_pb_key_root(Eterm pb_key,
1826 CATreeRootIterator* iter)
1827 {
1828 #ifdef DEBUG
1829 DbTableCATreeNode *rejected_base = NULL;
1830 #endif
1831 DbTableCATreeNode *node;
1832 DbTableCATreeNode *parent;
1833 DbTableCATreeNode* next_route_node;
1834 int current_level;
1835
1836 ASSERT(!iter->locked_bnode);
1837
1838 while (1) {
1839 node = GET_ROOT_ACQB(iter->tb);
1840 current_level = 0;
1841 parent = NULL;
1842 next_route_node = NULL;
1843 while (!node->is_base_node) {
1844 current_level++;
1845 parent = node;
1846 if (cmp_partly_bound(pb_key, GET_ROUTE_NODE_KEY(node)) >= 0) {
1847 next_route_node = node;
1848 node = GET_RIGHT_ACQB(node);
1849 } else {
1850 node = GET_LEFT_ACQB(node);
1851 }
1852 }
1853 ASSERT(node != rejected_base);
1854 lock_iter_base_node(iter, node, parent, current_level);
1855 if (node->u.base.is_valid) {
1856 iter->next_route_key = (next_route_node ?
1857 next_route_node->u.route.key.term :
1858 THE_NON_VALUE);
1859 return &node->u.base.root;
1860 }
1861 /* Retry */
1862 unlock_iter_base_node(iter);
1863 #ifdef DEBUG
1864 rejected_base = node;
1865 #endif
1866 }
1867 }
1868
1869 /** @brief Find root of tree where object with largest key of all smaller than
1870 * partially bound key may reside. Can be used as a starting point for
1871 * a forward iteration with pb_key.
1872 *
1873 * @param pb_key The partially bound key. Example {42, '$1'}
1874 * @param iter An initialized root iterator.
1875 *
1876 * @return Pointer to found root pointer. May not be NULL.
1877 */
catree_find_prev_from_pb_key_root(Eterm key,CATreeRootIterator * iter)1878 TreeDbTerm** catree_find_prev_from_pb_key_root(Eterm key,
1879 CATreeRootIterator* iter)
1880 {
1881 #ifdef DEBUG
1882 DbTableCATreeNode *rejected_base = NULL;
1883 #endif
1884 DbTableCATreeNode *node;
1885 DbTableCATreeNode *parent;
1886 DbTableCATreeNode* next_route_node;
1887 int current_level;
1888
1889 ASSERT(!iter->locked_bnode);
1890
1891 while (1) {
1892 node = GET_ROOT_ACQB(iter->tb);
1893 current_level = 0;
1894 parent = NULL;
1895 next_route_node = NULL;
1896 while (!node->is_base_node) {
1897 current_level++;
1898 parent = node;
1899 if (cmp_partly_bound(key, GET_ROUTE_NODE_KEY(node)) <= 0) {
1900 next_route_node = node;
1901 node = GET_LEFT_ACQB(node);
1902 } else {
1903 node = GET_RIGHT_ACQB(node);
1904 }
1905 }
1906 ASSERT(node != rejected_base);
1907 lock_iter_base_node(iter, node, parent, current_level);
1908 if (node->u.base.is_valid) {
1909 iter->next_route_key = (next_route_node ?
1910 next_route_node->u.route.key.term :
1911 THE_NON_VALUE);
1912 return &node->u.base.root;
1913 }
1914 /* Retry */
1915 unlock_iter_base_node(iter);
1916 #ifdef DEBUG
1917 rejected_base = node;
1918 #endif
1919 }
1920 }
1921
catree_find_firstlast_root(CATreeRootIterator * iter,int first)1922 static TreeDbTerm** catree_find_firstlast_root(CATreeRootIterator* iter,
1923 int first)
1924 {
1925 #ifdef DEBUG
1926 DbTableCATreeNode *rejected_base = NULL;
1927 #endif
1928 DbTableCATreeNode *node;
1929 DbTableCATreeNode* next_route_node;
1930 int current_level;
1931
1932 while (1) {
1933 node = GET_ROOT_ACQB(iter->tb);
1934 current_level = 0;
1935 next_route_node = NULL;
1936 while (!node->is_base_node) {
1937 current_level++;
1938 next_route_node = node;
1939 node = first ? GET_LEFT_ACQB(node) : GET_RIGHT_ACQB(node);
1940 }
1941 ASSERT(node != rejected_base);
1942 lock_iter_base_node(iter, node, next_route_node, current_level);
1943 if (node->u.base.is_valid) {
1944 iter->next_route_key = (next_route_node ?
1945 next_route_node->u.route.key.term :
1946 THE_NON_VALUE);
1947 return &node->u.base.root;
1948 }
1949 /* Retry */
1950 unlock_iter_base_node(iter);
1951 #ifdef DEBUG
1952 rejected_base = node;
1953 #endif
1954 }
1955 }
1956
catree_find_first_root(CATreeRootIterator * iter)1957 TreeDbTerm** catree_find_first_root(CATreeRootIterator* iter)
1958 {
1959 return catree_find_firstlast_root(iter, 1);
1960 }
1961
catree_find_last_root(CATreeRootIterator * iter)1962 TreeDbTerm** catree_find_last_root(CATreeRootIterator* iter)
1963 {
1964 return catree_find_firstlast_root(iter, 0);
1965 }
1966
db_member_catree(DbTable * tbl,Eterm key,Eterm * ret)1967 static int db_member_catree(DbTable *tbl, Eterm key, Eterm *ret)
1968 {
1969 DbTableCATree *tb = &tbl->catree;
1970 DbTableCATreeNode* node = find_rlock_valid_base_node(tb, key);
1971 int result = db_member_tree_common(&tb->common,
1972 node->u.base.root,
1973 key, ret, NULL);
1974 runlock_base_node(node, tb);
1975 return result;
1976 }
1977
db_get_element_catree(Process * p,DbTable * tbl,Eterm key,int ndex,Eterm * ret)1978 static int db_get_element_catree(Process *p, DbTable *tbl,
1979 Eterm key, int ndex, Eterm *ret)
1980 {
1981 DbTableCATree *tb = &tbl->catree;
1982 DbTableCATreeNode* node = find_rlock_valid_base_node(tb, key);
1983 int result = db_get_element_tree_common(p, &tb->common,
1984 node->u.base.root,
1985 key, ndex, ret, NULL);
1986 runlock_base_node(node, tb);
1987 return result;
1988 }
1989
db_erase_catree(DbTable * tbl,Eterm key,Eterm * ret)1990 static int db_erase_catree(DbTable *tbl, Eterm key, Eterm *ret)
1991 {
1992 DbTableCATree *tb = &tbl->catree;
1993 FindBaseNode fbn;
1994 DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn);
1995 int result = db_erase_tree_common(tbl, &node->u.base.root, key,
1996 ret, NULL);
1997 wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level);
1998 return result;
1999 }
2000
db_erase_object_catree(DbTable * tbl,Eterm object,Eterm * ret)2001 static int db_erase_object_catree(DbTable *tbl, Eterm object, Eterm *ret)
2002 {
2003 DbTableCATree *tb = &tbl->catree;
2004 Eterm key = GETKEY(&tb->common, tuple_val(object));
2005 FindBaseNode fbn;
2006 DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn);
2007 int result = db_erase_object_tree_common(tbl,
2008 &node->u.base.root,
2009 object,
2010 ret,
2011 NULL);
2012 wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level);
2013 return result;
2014 }
2015
2016
db_slot_catree(Process * p,DbTable * tbl,Eterm slot_term,Eterm * ret)2017 static int db_slot_catree(Process *p, DbTable *tbl,
2018 Eterm slot_term, Eterm *ret)
2019 {
2020 int result;
2021 CATreeRootIterator iter;
2022
2023 init_root_iterator(&tbl->catree, &iter, 1);
2024 result = db_slot_tree_common(p, tbl, *catree_find_first_root(&iter),
2025 slot_term, ret, NULL, &iter);
2026 destroy_root_iterator(&iter);
2027 return result;
2028 }
2029
db_select_continue_catree(Process * p,DbTable * tbl,Eterm continuation,Eterm * ret,enum DbIterSafety * safety_p)2030 static int db_select_continue_catree(Process *p,
2031 DbTable *tbl,
2032 Eterm continuation,
2033 Eterm *ret,
2034 enum DbIterSafety* safety_p)
2035 {
2036 int result;
2037 CATreeRootIterator iter;
2038
2039 init_root_iterator(&tbl->catree, &iter, 1);
2040 result = db_select_continue_tree_common(p, &tbl->common,
2041 continuation, ret, NULL, &iter);
2042 destroy_root_iterator(&iter);
2043 return result;
2044 }
2045
db_select_catree(Process * p,DbTable * tbl,Eterm tid,Eterm pattern,int reverse,Eterm * ret,enum DbIterSafety safety)2046 static int db_select_catree(Process *p, DbTable *tbl, Eterm tid,
2047 Eterm pattern, int reverse, Eterm *ret,
2048 enum DbIterSafety safety)
2049 {
2050 int result;
2051 CATreeRootIterator iter;
2052
2053 init_root_iterator(&tbl->catree, &iter, 1);
2054 result = db_select_tree_common(p, tbl, tid, pattern, reverse, ret,
2055 NULL, &iter);
2056 destroy_root_iterator(&iter);
2057 return result;
2058 }
2059
db_select_count_continue_catree(Process * p,DbTable * tbl,Eterm continuation,Eterm * ret,enum DbIterSafety * safety_p)2060 static int db_select_count_continue_catree(Process *p,
2061 DbTable *tbl,
2062 Eterm continuation,
2063 Eterm *ret,
2064 enum DbIterSafety* safety_p)
2065 {
2066 int result;
2067 CATreeRootIterator iter;
2068
2069 init_root_iterator(&tbl->catree, &iter, 1);
2070 result = db_select_count_continue_tree_common(p, tbl,
2071 continuation, ret, NULL,
2072 &iter);
2073 destroy_root_iterator(&iter);
2074 return result;
2075 }
2076
db_select_count_catree(Process * p,DbTable * tbl,Eterm tid,Eterm pattern,Eterm * ret,enum DbIterSafety safety)2077 static int db_select_count_catree(Process *p, DbTable *tbl, Eterm tid,
2078 Eterm pattern, Eterm *ret,
2079 enum DbIterSafety safety)
2080 {
2081 int result;
2082 CATreeRootIterator iter;
2083
2084 init_root_iterator(&tbl->catree, &iter, 1);
2085 result = db_select_count_tree_common(p, tbl,
2086 tid, pattern, ret, NULL, &iter);
2087 destroy_root_iterator(&iter);
2088 return result;
2089 }
2090
db_select_chunk_catree(Process * p,DbTable * tbl,Eterm tid,Eterm pattern,Sint chunk_size,int reversed,Eterm * ret,enum DbIterSafety safety)2091 static int db_select_chunk_catree(Process *p, DbTable *tbl, Eterm tid,
2092 Eterm pattern, Sint chunk_size,
2093 int reversed, Eterm *ret,
2094 enum DbIterSafety safety)
2095 {
2096 int result;
2097 CATreeRootIterator iter;
2098
2099 init_root_iterator(&tbl->catree, &iter, 1);
2100 result = db_select_chunk_tree_common(p, tbl,
2101 tid, pattern, chunk_size, reversed, ret,
2102 NULL, &iter);
2103 destroy_root_iterator(&iter);
2104 return result;
2105 }
2106
db_select_delete_continue_catree(Process * p,DbTable * tbl,Eterm continuation,Eterm * ret,enum DbIterSafety * safety_p)2107 static int db_select_delete_continue_catree(Process *p,
2108 DbTable *tbl,
2109 Eterm continuation,
2110 Eterm *ret,
2111 enum DbIterSafety* safety_p)
2112 {
2113 DbTreeStack stack;
2114 TreeDbTerm * stack_array[STACK_NEED];
2115 int result;
2116 CATreeRootIterator iter;
2117
2118 init_root_iterator(&tbl->catree, &iter, 0);
2119 init_tree_stack(&stack, stack_array, 0);
2120 result = db_select_delete_continue_tree_common(p, tbl, continuation, ret,
2121 &stack, &iter);
2122 destroy_root_iterator(&iter);
2123 return result;
2124 }
2125
db_select_delete_catree(Process * p,DbTable * tbl,Eterm tid,Eterm pattern,Eterm * ret,enum DbIterSafety safety)2126 static int db_select_delete_catree(Process *p, DbTable *tbl, Eterm tid,
2127 Eterm pattern, Eterm *ret,
2128 enum DbIterSafety safety)
2129 {
2130 DbTreeStack stack;
2131 TreeDbTerm * stack_array[STACK_NEED];
2132 int result;
2133 CATreeRootIterator iter;
2134
2135 init_root_iterator(&tbl->catree, &iter, 0);
2136 init_tree_stack(&stack, stack_array, 0);
2137 result = db_select_delete_tree_common(p, tbl,
2138 tid, pattern, ret, &stack,
2139 &iter);
2140 destroy_root_iterator(&iter);
2141 return result;
2142 }
2143
db_select_replace_catree(Process * p,DbTable * tbl,Eterm tid,Eterm pattern,Eterm * ret,enum DbIterSafety safety_p)2144 static int db_select_replace_catree(Process *p, DbTable *tbl, Eterm tid,
2145 Eterm pattern, Eterm *ret,
2146 enum DbIterSafety safety_p)
2147 {
2148 int result;
2149 CATreeRootIterator iter;
2150
2151 init_root_iterator(&tbl->catree, &iter, 0);
2152 result = db_select_replace_tree_common(p, tbl,
2153 tid, pattern, ret, NULL, &iter);
2154 destroy_root_iterator(&iter);
2155 return result;
2156 }
2157
db_select_replace_continue_catree(Process * p,DbTable * tbl,Eterm continuation,Eterm * ret,enum DbIterSafety * safety_p)2158 static int db_select_replace_continue_catree(Process *p, DbTable *tbl,
2159 Eterm continuation, Eterm *ret,
2160 enum DbIterSafety* safety_p)
2161 {
2162 int result;
2163 CATreeRootIterator iter;
2164
2165 init_root_iterator(&tbl->catree, &iter, 0);
2166 result = db_select_replace_continue_tree_common(p, tbl, continuation, ret,
2167 NULL, &iter);
2168 destroy_root_iterator(&iter);
2169 return result;
2170 }
2171
db_take_catree(Process * p,DbTable * tbl,Eterm key,Eterm * ret)2172 static int db_take_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
2173 {
2174 DbTableCATree *tb = &tbl->catree;
2175 FindBaseNode fbn;
2176 DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn);
2177 int result = db_take_tree_common(p, tbl, &node->u.base.root, key,
2178 ret, NULL);
2179 wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level);
2180 return result;
2181 }
2182
2183 /*
2184 ** Other interface routines (not directly coupled to one bif)
2185 */
2186
2187
2188 /* Display tree contents (for dump) */
db_print_catree(fmtfn_t to,void * to_arg,int show,DbTable * tbl)2189 static void db_print_catree(fmtfn_t to, void *to_arg,
2190 int show, DbTable *tbl)
2191 {
2192 CATreeRootIterator iter;
2193 TreeDbTerm** root;
2194
2195 init_root_iterator(&tbl->catree, &iter, 1);
2196 root = catree_find_first_root(&iter);
2197 do {
2198 db_print_tree_common(to, to_arg, show, *root, tbl);
2199 root = catree_find_next_root(&iter, NULL);
2200 } while (root);
2201 destroy_root_iterator(&iter);
2202 }
2203
2204 /* Release all memory occupied by a single table */
db_free_table_catree(DbTable * tbl)2205 static int db_free_table_catree(DbTable *tbl)
2206 {
2207 while (db_free_table_continue_catree(tbl, ERTS_SWORD_MAX) < 0)
2208 ;
2209 return 1;
2210 }
2211
2212 static
db_catree_nr_of_items_deleted_wb_dtor(Binary * context_bin)2213 int db_catree_nr_of_items_deleted_wb_dtor(Binary *context_bin) {
2214 (void)context_bin;
2215 return 1;
2216 }
2217
2218 typedef struct {
2219 Uint nr_of_deleted_items;
2220 } DbCATreeNrOfItemsDeletedWb;
2221
2222 static Eterm
create_and_install_num_of_deleted_items_wb_bin(Process * p,DbTableCATree * tb)2223 create_and_install_num_of_deleted_items_wb_bin(Process *p, DbTableCATree *tb)
2224 {
2225 Binary* bin =
2226 erts_create_magic_binary(sizeof(DbCATreeNrOfItemsDeletedWb),
2227 db_catree_nr_of_items_deleted_wb_dtor);
2228 DbCATreeNrOfItemsDeletedWb* data = ERTS_MAGIC_BIN_DATA(bin);
2229 Eterm* hp = HAlloc(p, ERTS_MAGIC_REF_THING_SIZE);
2230 Eterm mref = erts_mk_magic_ref(&hp, &MSO(p), bin);
2231 data->nr_of_deleted_items = 0;
2232 tb->nr_of_deleted_items_wb = bin;
2233 erts_refc_inctest(&bin->intern.refc, 2);
2234 return mref;
2235 }
2236
db_delete_all_objects_get_nitems_from_holder_catree(Process * p,Eterm mref)2237 static Eterm db_delete_all_objects_get_nitems_from_holder_catree(Process* p,
2238 Eterm mref)
2239 {
2240 Binary* bin = erts_magic_ref2bin(mref);
2241 DbCATreeNrOfItemsDeletedWb* data = ERTS_MAGIC_BIN_DATA(bin);
2242 return erts_make_integer(data->nr_of_deleted_items, p);
2243 }
2244
db_delete_all_objects_catree(Process * p,DbTable * tbl,SWord reds,Eterm * nitems_holder_wb)2245 static SWord db_delete_all_objects_catree(Process* p,
2246 DbTable* tbl,
2247 SWord reds,
2248 Eterm* nitems_holder_wb)
2249 {
2250 DbTableCATree *tb = &tbl->catree;
2251 DbCATreeNrOfItemsDeletedWb* data;
2252 if (!tb->deletion) {
2253 *nitems_holder_wb =
2254 create_and_install_num_of_deleted_items_wb_bin(p, tb);
2255 }
2256 reds = db_free_table_continue_catree(tbl, reds);
2257 if (reds < 0)
2258 return reds;
2259 data = ERTS_MAGIC_BIN_DATA(tb->nr_of_deleted_items_wb);
2260 data->nr_of_deleted_items = tb->nr_of_deleted_items;
2261 erts_bin_release(tb->nr_of_deleted_items_wb);
2262 db_create_catree(p, tbl);
2263 return reds;
2264 }
2265
2266
do_for_route_nodes(DbTableCATreeNode * node,void (* func)(ErlOffHeap *,void *),void * arg)2267 static void do_for_route_nodes(DbTableCATreeNode* node,
2268 void (*func)(ErlOffHeap *, void *),
2269 void *arg)
2270 {
2271 ErlOffHeap tmp_offheap;
2272
2273 if (!GET_LEFT(node)->is_base_node)
2274 do_for_route_nodes(GET_LEFT(node), func, arg);
2275
2276 tmp_offheap.first = node->u.route.key.oh;
2277 tmp_offheap.overhead = 0;
2278 (*func)(&tmp_offheap, arg);
2279
2280 if (!GET_RIGHT(node)->is_base_node)
2281 do_for_route_nodes(GET_RIGHT(node), func, arg);
2282 }
2283
db_foreach_offheap_catree(DbTable * tbl,void (* func)(ErlOffHeap *,void *),void * arg)2284 static void db_foreach_offheap_catree(DbTable *tbl,
2285 void (*func)(ErlOffHeap *, void *),
2286 void *arg)
2287 {
2288 DbTableCATree* tb = &tbl->catree;
2289 CATreeRootIterator iter;
2290 TreeDbTerm** root;
2291
2292 if (!GET_ROOT(tb)) {
2293 ASSERT(tb->common.status & DB_DELETE);
2294 return;
2295 }
2296 init_root_iterator(tb, &iter, 1);
2297 root = catree_find_first_root(&iter);
2298 do {
2299 db_foreach_offheap_tree_common(*root, func, arg);
2300 root = catree_find_next_root(&iter, NULL);
2301 } while (root);
2302 destroy_root_iterator(&iter);
2303
2304 do_for_route_nodes(GET_ROOT(tb), func, arg);
2305 }
2306
db_lookup_dbterm_catree(Process * p,DbTable * tbl,Eterm key,Eterm obj,DbUpdateHandle * handle)2307 static int db_lookup_dbterm_catree(Process *p, DbTable *tbl, Eterm key, Eterm obj,
2308 DbUpdateHandle *handle)
2309 {
2310 DbTableCATree *tb = &tbl->catree;
2311 FindBaseNode fbn;
2312 DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn);
2313 int res = db_lookup_dbterm_tree_common(p, tbl, &node->u.base.root, key,
2314 obj, handle, NULL);
2315 if (res == 0) {
2316 wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level);
2317 } else {
2318 /* db_finalize_dbterm_catree will unlock */
2319 handle->u.catree.base_node = node;
2320 handle->u.catree.parent = fbn.parent;
2321 handle->u.catree.current_level = fbn.current_level;
2322 }
2323 return res;
2324 }
2325
db_finalize_dbterm_catree(int cret,DbUpdateHandle * handle)2326 static void db_finalize_dbterm_catree(int cret, DbUpdateHandle *handle)
2327 {
2328 DbTableCATree *tb = &(handle->tb->catree);
2329 db_finalize_dbterm_tree_common(cret,
2330 handle,
2331 &handle->u.catree.base_node->u.base.root,
2332 NULL);
2333 wunlock_adapt_base_node(tb, handle->u.catree.base_node,
2334 handle->u.catree.parent,
2335 handle->u.catree.current_level);
2336 return;
2337 }
2338
db_get_binary_info_catree(Process * p,DbTable * tbl,Eterm key,Eterm * ret)2339 static int db_get_binary_info_catree(Process *p, DbTable *tbl, Eterm key,
2340 Eterm *ret)
2341 {
2342 DbTableCATree *tb = &tbl->catree;
2343 FindBaseNode fbn;
2344 DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn);
2345 TreeDbTerm* this = db_find_tree_node_common(&tbl->common, node->u.base.root,
2346 key);
2347 *ret = db_binary_info_tree_common(p, this);
2348 wunlock_base_node(node);
2349 return DB_ERROR_NONE;
2350 }
2351
2352 #ifdef ERTS_ENABLE_LOCK_COUNT
erts_lcnt_enable_db_catree_lock_count_helper(DbTableCATree * tb,DbTableCATreeNode * node,int enable)2353 static void erts_lcnt_enable_db_catree_lock_count_helper(DbTableCATree *tb,
2354 DbTableCATreeNode *node,
2355 int enable)
2356 {
2357 erts_lcnt_ref_t *lcnt_ref;
2358 erts_lock_flags_t lock_type;
2359 if (node->is_base_node) {
2360 lcnt_ref = &GET_BASE_NODE_LOCK(node)->lcnt;
2361 lock_type = ERTS_LOCK_TYPE_RWMUTEX;
2362 } else {
2363 erts_lcnt_enable_db_catree_lock_count_helper(tb, GET_LEFT(node), enable);
2364 erts_lcnt_enable_db_catree_lock_count_helper(tb, GET_RIGHT(node), enable);
2365 lcnt_ref = &GET_ROUTE_NODE_LOCK(node)->lcnt;
2366 lock_type = ERTS_LOCK_TYPE_MUTEX;
2367 }
2368 if (enable) {
2369 erts_lcnt_install_new_lock_info(lcnt_ref, "db_hash_slot", tb->common.the_name,
2370 lock_type | ERTS_LOCK_FLAGS_CATEGORY_DB);
2371 } else {
2372 erts_lcnt_uninstall(lcnt_ref);
2373 }
2374 }
2375
erts_lcnt_enable_db_catree_lock_count(DbTableCATree * tb,int enable)2376 void erts_lcnt_enable_db_catree_lock_count(DbTableCATree *tb, int enable)
2377 {
2378 erts_lcnt_enable_db_catree_lock_count_helper(tb, GET_ROOT(tb), enable);
2379 }
2380 #endif /* ERTS_ENABLE_LOCK_COUNT */
2381
db_catree_force_split(DbTableCATree * tb,int on)2382 void db_catree_force_split(DbTableCATree* tb, int on)
2383 {
2384 CATreeRootIterator iter;
2385 TreeDbTerm** root;
2386
2387 init_root_iterator(tb, &iter, 1);
2388 root = catree_find_first_root(&iter);
2389 do {
2390 BASE_NODE_STAT_SET(iter.locked_bnode, (on ? INT_MAX : 0));
2391 root = catree_find_next_root(&iter, NULL);
2392 } while (root);
2393 destroy_root_iterator(&iter);
2394
2395 if (on)
2396 tb->common.status |= DB_CATREE_FORCE_SPLIT;
2397 else
2398 tb->common.status &= ~DB_CATREE_FORCE_SPLIT;
2399 }
2400
db_catree_debug_random_split_join(DbTableCATree * tb,int on)2401 void db_catree_debug_random_split_join(DbTableCATree* tb, int on)
2402 {
2403 if (on)
2404 tb->common.status |= DB_CATREE_DEBUG_RANDOM_SPLIT_JOIN;
2405 else
2406 tb->common.status &= ~DB_CATREE_DEBUG_RANDOM_SPLIT_JOIN;
2407 }
2408
db_calc_stats_catree(DbTableCATree * tb,DbCATreeStats * stats)2409 void db_calc_stats_catree(DbTableCATree* tb, DbCATreeStats* stats)
2410 {
2411 DbTableCATreeNode* stack[ERL_DB_CATREE_MAX_ROUTE_NODE_LAYER_HEIGHT];
2412 DbTableCATreeNode* node;
2413 Uint depth = 0;
2414
2415 stats->route_nodes = 0;
2416 stats->base_nodes = 0;
2417 stats->max_depth = 0;
2418
2419 node = GET_ROOT(tb);
2420 do {
2421 while (!node->is_base_node) {
2422 stats->route_nodes++;
2423 ASSERT(depth < sizeof(stack)/sizeof(*stack));
2424 stack[depth++] = node; /* PUSH parent */
2425 if (stats->max_depth < depth)
2426 stats->max_depth = depth;
2427 node = GET_LEFT(node);
2428 }
2429 stats->base_nodes++;
2430
2431 while (depth > 0) {
2432 DbTableCATreeNode* parent = stack[depth-1];
2433 if (node == GET_LEFT(parent)) {
2434 node = GET_RIGHT(parent);
2435 break;
2436 }
2437 else {
2438 ASSERT(node == GET_RIGHT(parent));
2439 node = parent;
2440 depth--; /* POP parent */
2441 }
2442 }
2443 } while (depth > 0);
2444 }
2445
2446 struct debug_catree_fa {
2447 void (*func)(ErlOffHeap *, void *);
2448 void *arg;
2449 };
2450
debug_free_route_node(void * vfap,ErtsThrPrgrVal val,void * vnp)2451 static void debug_free_route_node(void *vfap, ErtsThrPrgrVal val, void *vnp)
2452 {
2453 DbTableCATreeNode *np = vnp;
2454 if (np->u.route.key.oh) {
2455 struct debug_catree_fa *fap = vfap;
2456 ErlOffHeap oh;
2457 ERTS_INIT_OFF_HEAP(&oh);
2458 oh.first = np->u.route.key.oh;
2459 (*fap->func)(&oh, fap->arg);
2460 }
2461 }
2462
2463 void
erts_db_foreach_thr_prgr_offheap_catree(void (* func)(ErlOffHeap *,void *),void * arg)2464 erts_db_foreach_thr_prgr_offheap_catree(void (*func)(ErlOffHeap *, void *),
2465 void *arg)
2466 {
2467 struct debug_catree_fa fa;
2468 fa.func = func;
2469 fa.arg = arg;
2470 erts_debug_later_op_foreach(do_free_route_node, debug_free_route_node, &fa);
2471 }
2472
2473
2474 #ifdef HARDDEBUG
2475
2476 /*
2477 * Not called, but kept as it might come to use
2478 */
my_check_table_tree(TreeDbTerm * t)2479 static inline int my_check_table_tree(TreeDbTerm *t)
2480 {
2481 int lh, rh;
2482 if (t == NULL)
2483 return 0;
2484 lh = my_check_table_tree(t->left);
2485 rh = my_check_table_tree(t->right);
2486 if ((rh - lh) != t->balance) {
2487 erts_fprintf(stderr, "Invalid tree balance for this node:\n");
2488 erts_fprintf(stderr,"balance = %d, left = 0x%08X, right = 0x%08X\n",
2489 t->balance, t->left, t->right);
2490 erts_fprintf(stderr,"\nDump:\n---------------------------------\n");
2491 erts_fprintf(stderr,"\n---------------------------------\n");
2492 abort();
2493 }
2494 return ((rh > lh) ? rh : lh) + 1;
2495 }
2496
2497 #endif
2498