1 /*
2  * %CopyrightBegin%
3  *
4  * Copyright Ericsson AB 1998-2020. All Rights Reserved.
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * %CopyrightEnd%
19  */
20 
21 /*
22 ** Implementation of ordered ETS tables.
23 ** The tables are implemented as AVL trees (Published by Adelson-Velski
24 ** and Landis). A nice source for learning about these trees is
25 ** Wirth's Algorithms + Datastructures = Programs.
26 ** The implementation here is however not made with recursion
27 ** as the examples in Wirths book are.
28 */
29 
30 /*
31 #ifdef DEBUG
32 #define HARDDEBUG 1
33 #endif
34 */
35 #ifdef HAVE_CONFIG_H
36 #  include "config.h"
37 #endif
38 
39 #include "sys.h"
40 #include "erl_vm.h"
41 #include "global.h"
42 #include "erl_process.h"
43 #include "error.h"
44 #define ERTS_WANT_DB_INTERNAL__
45 #include "erl_db.h"
46 #include "bif.h"
47 #include "big.h"
48 #include "erl_binary.h"
49 
50 #include "erl_db_tree.h"
51 #include "erl_db_tree_util.h"
52 
53 #define GETKEY_WITH_POS(Keypos, Tplp) (*((Tplp) + Keypos))
54 
55 #define NITEMS_CENTRALIZED(tb)                                          \
56     ((Sint)erts_flxctr_read_centralized(&(tb)->common.counters,          \
57                                         ERTS_DB_TABLE_NITEMS_COUNTER_ID))
58 #define ADD_NITEMS(DB, TO_ADD)                                          \
59     erts_flxctr_add(&(DB)->common.counters, ERTS_DB_TABLE_NITEMS_COUNTER_ID, TO_ADD)
60 #define INC_NITEMS(DB)                                                  \
61     erts_flxctr_inc(&(DB)->common.counters, ERTS_DB_TABLE_NITEMS_COUNTER_ID)
62 #define INC_NITEMS_CENTRALIZED(DB)                                      \
63     erts_flxctr_inc_read_centralized(&(DB)->common.counters, ERTS_DB_TABLE_NITEMS_COUNTER_ID)
64 #define RESET_NITEMS(DB)                                                \
65     erts_flxctr_reset(&(DB)->common.counters, ERTS_DB_TABLE_NITEMS_COUNTER_ID)
66 #define IS_CENTRALIZED_CTR(tb) (!(tb)->common.counters.is_decentralized)
67 #define APPROX_MEM_CONSUMED(tb) erts_flxctr_read_approx(&(tb)->common.counters, ERTS_DB_TABLE_MEM_COUNTER_ID)
68 
69 #define TOPN_NODE(Dtt, Pos)                   \
70      (((Pos) < Dtt->pos) ? 			\
71       (Dtt)->array[(Dtt)->pos - ((Pos) + 1)] : NULL)
72 
73 #define REPLACE_TOP_NODE(Dtt, Node)          \
74      if ((Dtt)->pos) (Dtt)->array[(Dtt)->pos - 1] = (Node)
75 
76 #define EMPTY_NODE(Dtt) (TOP_NODE(Dtt) == NULL)
77 
78 
79 /* Obtain table static stack if available. NULL if not.
80 ** Must be released with release_stack()
81 */
get_static_stack(DbTableTree * tb)82 ERTS_INLINE static DbTreeStack* get_static_stack(DbTableTree* tb)
83 {
84     if (tb != NULL) {
85         ASSERT(IS_TREE_TABLE(tb->common.type));
86         if (!erts_atomic_xchg_acqb(&tb->is_stack_busy, 1))
87             return &tb->static_stack;
88     }
89     return NULL;
90 }
91 
92 /* Obtain static stack if available, otherwise empty dynamic stack.
93 ** Must be released with release_stack()
94 */
get_any_stack(DbTable * tb,DbTableTree * stack_container)95 static DbTreeStack* get_any_stack(DbTable* tb, DbTableTree* stack_container)
96 {
97     DbTreeStack* stack;
98     if (stack_container != NULL) {
99         ASSERT(IS_TREE_TABLE(stack_container->common.type));
100         if (!erts_atomic_xchg_acqb(&stack_container->is_stack_busy, 1))
101             return &stack_container->static_stack;
102     }
103     stack = erts_db_alloc(ERTS_ALC_T_DB_STK, tb,
104 			  sizeof(DbTreeStack) + sizeof(TreeDbTerm*) * STACK_NEED);
105     stack->pos = 0;
106     stack->slot = 0;
107     stack->array = (TreeDbTerm**) (stack + 1);
108     return stack;
109 }
110 
release_stack(DbTable * tb,DbTableTree * stack_container,DbTreeStack * stack)111 static void release_stack(DbTable* tb, DbTableTree* stack_container, DbTreeStack* stack)
112 {
113     if (stack_container != NULL) {
114         ASSERT(IS_TREE_TABLE(stack_container->common.type));
115         if (stack == &stack_container->static_stack) {
116             ASSERT(erts_atomic_read_nob(&stack_container->is_stack_busy) == 1);
117             erts_atomic_set_relb(&stack_container->is_stack_busy, 0);
118             return;
119         }
120     }
121     erts_db_free(ERTS_ALC_T_DB_STK, tb,
122                  (void *) stack, sizeof(DbTreeStack) + sizeof(TreeDbTerm*) * STACK_NEED);
123 }
124 
reset_stack(DbTreeStack * stack)125 static ERTS_INLINE void reset_stack(DbTreeStack* stack)
126 {
127     if (stack != NULL) {
128         stack->pos = 0;
129         stack->slot = 0;
130     }
131 }
132 
reset_static_stack(DbTableTree * tb)133 static ERTS_INLINE void reset_static_stack(DbTableTree* tb)
134 {
135     if (tb != NULL) {
136         ASSERT(IS_TREE_TABLE(tb->common.type));
137         reset_stack(&tb->static_stack);
138     }
139 }
140 
new_dbterm(DbTableCommon * tb,Eterm obj)141 static ERTS_INLINE TreeDbTerm* new_dbterm(DbTableCommon *tb, Eterm obj)
142 {
143     TreeDbTerm* p;
144     if (tb->compress) {
145 	p = db_store_term_comp(tb, tb->keypos, NULL, offsetof(TreeDbTerm,dbterm), obj);
146     }
147     else {
148 	p = db_store_term(tb, NULL, offsetof(TreeDbTerm,dbterm), obj);
149     }
150     return p;
151 }
152 
new_dbterm_no_tab(int compress,int keypos,Eterm obj)153 static ERTS_INLINE TreeDbTerm* new_dbterm_no_tab(int compress, int keypos, Eterm obj)
154 {
155     TreeDbTerm* p;
156     if (compress) {
157 	p = db_store_term_comp(NULL, keypos, NULL, offsetof(TreeDbTerm,dbterm), obj);
158     }
159     else {
160 	p = db_store_term(NULL, NULL, offsetof(TreeDbTerm,dbterm), obj);
161     }
162     return p;
163 }
164 
replace_dbterm(DbTableCommon * tb,TreeDbTerm * old,Eterm obj)165 static ERTS_INLINE TreeDbTerm* replace_dbterm(DbTableCommon *tb, TreeDbTerm* old,
166 					      Eterm obj)
167 {
168     TreeDbTerm* p;
169     ASSERT(old != NULL);
170     if (tb->compress) {
171 	p = db_store_term_comp(tb, tb->keypos, &(old->dbterm), offsetof(TreeDbTerm,dbterm), obj);
172     }
173     else {
174 	p = db_store_term(tb, &(old->dbterm), offsetof(TreeDbTerm,dbterm), obj);
175     }
176     return p;
177 }
178 
179 /*
180  * Number of records to delete before trapping.
181  */
182 #define DELETE_RECORD_LIMIT 12000
183 
184 /*
185 ** Debugging
186 */
187 #ifdef HARDDEBUG
188 static TreeDbTerm *traverse_until(TreeDbTerm *t, int *current, int to);
189 static void check_slot_pos(DbTableTree *tb);
190 static void check_saved_stack(DbTableTree *tb);
191 
192 #define TREE_DEBUG
193 #endif
194 
195 #ifdef TREE_DEBUG
196 /*
197 ** Primitive trace macro
198 */
199 #define DBG erts_fprintf(stderr,"%d\n",__LINE__)
200 
201 /*
202 ** Debugging dump
203 */
204 
205 static void do_dump_tree2(DbTableTree*, int to, void *to_arg, int show,
206 			  TreeDbTerm *t, int offset);
207 
208 #else
209 
210 #define DBG /* nothing */
211 
212 #endif
213 
214 /*
215 ** Datatypes
216 */
217 
218 enum ms_key_boundness {
219     /* Order significant, larger means more "boundness" => less iteration */
220     MS_KEY_UNBOUND           = 0,
221     MS_KEY_PARTIALLY_BOUND   = 1,
222     MS_KEY_BOUND             = 2,
223     MS_KEY_IMPOSSIBLE        = 3
224 };
225 
226 /*
227  * This structure is filled in by analyze_pattern() for the select
228  * functions.
229  */
230 struct mp_info {
231     enum ms_key_boundness key_boundness;
232     Eterm least;		/* The lowest matching key (possibly
233 				 * partially bound expression) */
234     Eterm most;                 /* The highest matching key (possibly
235 				 * partially bound expression) */
236     Binary *mp;                 /* The compiled match program */
237 };
238 
239 struct select_common {
240     TreeDbTerm **root;
241 };
242 
243 
244 /*
245  * Used by doit_select(_chunk)
246  */
247 struct select_context {
248     struct select_common common;
249     Process *p;
250     Eterm accum;
251     Binary *mp;
252     Eterm end_condition;
253     Eterm *lastobj;
254     Sint32 max;
255     int keypos;
256     Sint got;
257     Sint chunk_size;
258 };
259 
260 /*
261  * Used by doit_select_count
262  */
263 struct select_count_context {
264     struct select_common common;
265     Process *p;
266     Binary *mp;
267     Eterm end_condition;
268     Eterm *lastobj;
269     Sint32 max;
270     int keypos;
271     Sint got;
272 };
273 
274 /*
275  * Used by doit_select_delete
276  */
277 struct select_delete_context {
278     struct select_common common;
279     Process *p;
280     DbTableCommon *tb;
281     DbTreeStack *stack;
282     Uint accum;
283     Binary *mp;
284     Eterm end_condition;
285     int erase_lastterm;
286     TreeDbTerm *lastterm;
287     Sint32 max;
288     int keypos;
289 };
290 
291 /*
292  * Used by doit_select_replace
293  */
294 struct select_replace_context {
295     struct select_common common;
296     Process *p;
297     DbTableCommon *tb;
298     Binary *mp;
299     Eterm end_condition;
300     Eterm *lastobj;
301     Sint32 max;
302     int keypos;
303     Sint replaced;
304 };
305 
306 /* Used by select_replace on analyze_pattern */
307 typedef int (*extra_match_validator_t)(int keypos, Eterm match, Eterm guard, Eterm body);
308 
309 /*
310 ** Forward declarations
311 */
312 static TreeDbTerm *linkout_tree(DbTableCommon *tb, TreeDbTerm **root,
313                                 Eterm key, DbTreeStack *stack);
314 static TreeDbTerm *linkout_object_tree(DbTableCommon *tb,  TreeDbTerm **root,
315 				       Eterm object, DbTableTree *stack);
316 static SWord do_free_tree_continue(DbTableTree *tb, SWord reds);
317 static void free_term(DbTable *tb, TreeDbTerm* p);
318 int tree_balance_left(TreeDbTerm **this);
319 int tree_balance_right(TreeDbTerm **this);
320 static int delsub(TreeDbTerm **this);
321 static TreeDbTerm *slot_search(Process *p, TreeDbTerm *root, Sint slot,
322                                DbTable *tb, DbTableTree *stack_container,
323                                CATreeRootIterator *iter, int* is_EOT);
324 static TreeDbTerm *find_node(DbTableCommon *tb, TreeDbTerm *root,
325                              Eterm key, DbTableTree *stack_container);
326 static TreeDbTerm **find_node2(DbTableCommon *tb, TreeDbTerm **root, Eterm key);
327 static TreeDbTerm **find_ptr(DbTableCommon *tb, TreeDbTerm **root,
328                              DbTreeStack *stack, TreeDbTerm *this);
329 static TreeDbTerm *find_next(DbTableCommon *tb, TreeDbTerm *root,
330                              DbTreeStack* stack, Eterm key);
331 static TreeDbTerm *find_prev(DbTableCommon *tb, TreeDbTerm *root,
332                              DbTreeStack* stack, Eterm key);
333 static TreeDbTerm *find_next_from_pb_key(DbTable*, TreeDbTerm*** rootpp,
334                                          DbTreeStack* stack, Eterm key,
335                                          CATreeRootIterator*);
336 static TreeDbTerm *find_prev_from_pb_key(DbTable*,  TreeDbTerm*** rootpp,
337                                          DbTreeStack* stack, Eterm key,
338                                          CATreeRootIterator*);
339 typedef int traverse_doit_funcT(DbTableCommon*, TreeDbTerm*,
340                                 struct select_common*, int forward);
341 
342 static void traverse_backwards(DbTableCommon *tb,
343 			       DbTreeStack*,
344 			       Eterm lastkey,
345 			       traverse_doit_funcT*,
346 			       struct select_common *context,
347                                CATreeRootIterator*);
348 static void traverse_forward(DbTableCommon *tb,
349 			     DbTreeStack*,
350 			     Eterm lastkey,
351                              traverse_doit_funcT*,
352 			     struct select_common *context,
353                              CATreeRootIterator*);
354 static void traverse_update_backwards(DbTableCommon *tb,
355                                       DbTreeStack*,
356                                       Eterm lastkey,
357                                       int (*doit)(DbTableCommon *tb,
358                                                   TreeDbTerm **, // out
359                                                   struct select_common*,
360                                                   int),
361                                       struct select_common*,
362                                       CATreeRootIterator*);
363 static enum ms_key_boundness key_boundness(DbTableCommon *tb,
364                                            Eterm pattern, Eterm *keyp);
365 static Sint do_cmp_partly_bound(Eterm a, Eterm b, int *done);
366 
367 static int analyze_pattern(DbTableCommon *tb, Eterm pattern,
368                            extra_match_validator_t extra_validator, /* Optional callback */
369                            struct mp_info *mpi);
370 static int doit_select(DbTableCommon *tb,
371                        TreeDbTerm *this,
372                        struct select_common* ptr,
373 		       int forward);
374 static int doit_select_count(DbTableCommon *tb,
375 			     TreeDbTerm *this,
376                              struct select_common*,
377 			     int forward);
378 static int doit_select_chunk(DbTableCommon *tb,
379 			     TreeDbTerm *this,
380                              struct select_common*,
381 			     int forward);
382 static int doit_select_delete(DbTableCommon *tb,
383 			      TreeDbTerm *this,
384 			      struct select_common*,
385 			      int forward);
386 static int doit_select_replace(DbTableCommon *tb,
387                                TreeDbTerm **this_ptr,
388                                struct select_common*,
389                                int forward);
390 
391 static int partly_bound_can_match_lesser(Eterm partly_bound_1,
392 					 Eterm partly_bound_2);
393 static int partly_bound_can_match_greater(Eterm partly_bound_1,
394 					  Eterm partly_bound_2);
395 static int do_partly_bound_can_match_lesser(Eterm a, Eterm b,
396 					    int *done);
397 static int do_partly_bound_can_match_greater(Eterm a, Eterm b,
398 					     int *done);
399 static BIF_RETTYPE ets_select_reverse(BIF_ALIST_3);
400 
401 
402 /* Method interface functions */
403 static int db_first_tree(Process *p, DbTable *tbl,
404 		  Eterm *ret);
405 static int db_next_tree(Process *p, DbTable *tbl,
406 			Eterm key, Eterm *ret);
407 static int db_last_tree(Process *p, DbTable *tbl,
408 			Eterm *ret);
409 static int db_prev_tree(Process *p, DbTable *tbl,
410 			Eterm key,
411 			Eterm *ret);
412 static int db_put_tree(DbTable *tbl, Eterm obj, int key_clash_fail, SWord *consumed_reds_p);
413 static int db_get_tree(Process *p, DbTable *tbl,
414 		       Eterm key,  Eterm *ret);
415 static int db_member_tree(DbTable *tbl, Eterm key, Eterm *ret);
416 static int db_get_element_tree(Process *p, DbTable *tbl,
417 			       Eterm key,int ndex,
418 			       Eterm *ret);
419 static int db_erase_tree(DbTable *tbl, Eterm key, Eterm *ret);
420 static int db_erase_object_tree(DbTable *tbl, Eterm object,Eterm *ret);
421 static int db_slot_tree(Process *p, DbTable *tbl,
422 			Eterm slot_term,  Eterm *ret);
423 static int db_select_tree(Process *p, DbTable *tbl, Eterm tid,
424 			  Eterm pattern, int reversed, Eterm *ret,
425                           enum DbIterSafety);
426 static int db_select_count_tree(Process *p, DbTable *tbl, Eterm tid,
427 				Eterm pattern,  Eterm *ret, enum DbIterSafety);
428 static int db_select_chunk_tree(Process *p, DbTable *tbl, Eterm tid,
429 				Eterm pattern, Sint chunk_size,
430 				int reversed, Eterm *ret, enum DbIterSafety);
431 static int db_select_continue_tree(Process *p, DbTable *tbl,
432 				   Eterm continuation, Eterm *ret,
433                                    enum DbIterSafety*);
434 static int db_select_count_continue_tree(Process *p, DbTable *tbl,
435 					 Eterm continuation, Eterm *ret,
436                                          enum DbIterSafety*);
437 static int db_select_delete_tree(Process *p, DbTable *tbl, Eterm tid,
438 				 Eterm pattern,  Eterm *ret,
439                                  enum DbIterSafety);
440 static int db_select_delete_continue_tree(Process *p, DbTable *tbl,
441 					  Eterm continuation, Eterm *ret,
442                                           enum DbIterSafety*);
443 static int db_select_replace_tree(Process *p, DbTable *tbl, Eterm tid,
444                                   Eterm pattern, Eterm *ret,
445                                   enum DbIterSafety);
446 static int db_select_replace_continue_tree(Process *p, DbTable *tbl,
447                                            Eterm continuation, Eterm *ret,
448                                            enum DbIterSafety*);
449 static int db_take_tree(Process *, DbTable *, Eterm, Eterm *);
450 static void db_print_tree(fmtfn_t to, void *to_arg,
451 			  int show, DbTable *tbl);
452 static int db_free_empty_table_tree(DbTable *tbl);
453 
454 static SWord db_free_table_continue_tree(DbTable *tbl, SWord);
455 
456 static void db_foreach_offheap_tree(DbTable *,
457 				    void (*)(ErlOffHeap *, void *),
458 				    void *);
459 
460 static SWord db_delete_all_objects_tree(Process* p,
461                                         DbTable* tbl,
462                                         SWord reds,
463                                         Eterm* nitems_holder_wb);
464 static Eterm db_delete_all_objects_get_nitems_from_holder_tree(Process* p,
465                                                                Eterm nitems_holder);
466 #ifdef HARDDEBUG
467 static void db_check_table_tree(DbTable *tbl);
468 #endif
469 static int
470 db_lookup_dbterm_tree(Process *, DbTable *, Eterm key, Eterm obj,
471                       DbUpdateHandle*);
472 static void
473 db_finalize_dbterm_tree(int cret, DbUpdateHandle *);
474 static int db_get_binary_info_tree(Process*, DbTable*, Eterm key, Eterm *ret);
475 static int db_put_dbterm_tree(DbTable* tbl, /* [in out] */
476                               void* obj,
477                               int key_clash_fail,
478                               SWord *consumed_reds_p);
479 
480 /*
481 ** Static variables
482 */
483 
484 Export ets_select_reverse_exp;
485 
486 /*
487 ** External interface
488 */
489 DbTableMethod db_tree =
490 {
491     db_create_tree,
492     db_first_tree,
493     db_next_tree,
494     db_last_tree,
495     db_prev_tree,
496     db_put_tree,
497     db_get_tree,
498     db_get_element_tree,
499     db_member_tree,
500     db_erase_tree,
501     db_erase_object_tree,
502     db_slot_tree,
503     db_select_chunk_tree,
504     db_select_tree, /* why not chunk size=0 ??? */
505     db_select_delete_tree,
506     db_select_continue_tree,
507     db_select_delete_continue_tree,
508     db_select_count_tree,
509     db_select_count_continue_tree,
510     db_select_replace_tree,
511     db_select_replace_continue_tree,
512     db_take_tree,
513     db_delete_all_objects_tree,
514     db_delete_all_objects_get_nitems_from_holder_tree,
515     db_free_empty_table_tree,
516     db_free_table_continue_tree,
517     db_print_tree,
518     db_foreach_offheap_tree,
519     db_lookup_dbterm_tree,
520     db_finalize_dbterm_tree,
521     db_eterm_to_dbterm_tree_common,
522     db_dbterm_list_prepend_tree_common,
523     db_dbterm_list_remove_first_tree_common,
524     db_put_dbterm_tree,
525     db_free_dbterm_tree_common,
526     db_get_dbterm_key_tree_common,
527     db_get_binary_info_tree,
528     db_first_tree, /* raw_first same as first */
529     db_next_tree   /* raw_next same as next */
530 };
531 
532 
533 
534 
535 
db_initialize_tree(void)536 void db_initialize_tree(void)
537 {
538     erts_init_trap_export(&ets_select_reverse_exp, am_ets, am_reverse, 3,
539 			  &ets_select_reverse);
540     return;
541 };
542 
543 /*
544 ** Table interface routines ie what's called by the bif's
545 */
546 
db_create_tree(Process * p,DbTable * tbl)547 int db_create_tree(Process *p, DbTable *tbl)
548 {
549     DbTableTree *tb = &tbl->tree;
550     tb->root = NULL;
551     tb->static_stack.array = erts_db_alloc(ERTS_ALC_T_DB_STK,
552 					   (DbTable *) tb,
553 					   sizeof(TreeDbTerm *) * STACK_NEED);
554     tb->static_stack.pos = 0;
555     tb->static_stack.slot = 0;
556     erts_atomic_init_nob(&tb->is_stack_busy, 0);
557     tb->deletion = 0;
558     return DB_ERROR_NONE;
559 }
560 
db_first_tree_common(Process * p,DbTable * tbl,TreeDbTerm * root,Eterm * ret,DbTableTree * stack_container)561 int db_first_tree_common(Process *p, DbTable *tbl, TreeDbTerm *root,
562                          Eterm *ret, DbTableTree *stack_container)
563 {
564     DbTreeStack* stack;
565     TreeDbTerm *this;
566 
567     if (( this = root ) == NULL) {
568 	*ret = am_EOT;
569 	return DB_ERROR_NONE;
570     }
571     /* Walk down the tree to the left */
572     if ((stack = get_static_stack(stack_container)) != NULL) {
573 	stack->pos = stack->slot = 0;
574     }
575     while (this->left != NULL) {
576 	if (stack) PUSH_NODE(stack, this);
577 	this = this->left;
578     }
579     if (stack) {
580 	PUSH_NODE(stack, this);
581 	stack->slot = 1;
582 	release_stack(tbl,stack_container,stack);
583     }
584     *ret = db_copy_key(p, tbl, &this->dbterm);
585     return DB_ERROR_NONE;
586 }
587 
db_first_tree(Process * p,DbTable * tbl,Eterm * ret)588 static int db_first_tree(Process *p, DbTable *tbl, Eterm *ret)
589 {
590     DbTableTree *tb = &tbl->tree;
591     return db_first_tree_common(p, tbl, tb->root, ret, tb);
592 }
593 
db_next_tree_common(Process * p,DbTable * tbl,TreeDbTerm * root,Eterm key,Eterm * ret,DbTreeStack * stack)594 int db_next_tree_common(Process *p, DbTable *tbl,
595                         TreeDbTerm *root, Eterm key,
596                         Eterm *ret, DbTreeStack* stack)
597 {
598     TreeDbTerm *this;
599 
600     if (key == am_EOT)
601 	return DB_ERROR_BADKEY;
602     this = find_next(&tbl->common, root, stack, key);
603     if (this == NULL) {
604 	*ret = am_EOT;
605 	return DB_ERROR_NONE;
606     }
607     *ret = db_copy_key(p, tbl, &this->dbterm);
608     return DB_ERROR_NONE;
609 }
610 
db_next_tree(Process * p,DbTable * tbl,Eterm key,Eterm * ret)611 static int db_next_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
612 {
613     DbTableTree *tb = &tbl->tree;
614     DbTreeStack* stack = get_any_stack(tbl, tb);
615     int ret_val = db_next_tree_common(p, tbl, tb->root, key, ret, stack);
616     release_stack(tbl,tb,stack);
617     return ret_val;
618 }
619 
db_last_tree_common(Process * p,DbTable * tbl,TreeDbTerm * root,Eterm * ret,DbTableTree * stack_container)620 int db_last_tree_common(Process *p, DbTable *tbl, TreeDbTerm *root,
621                         Eterm *ret, DbTableTree *stack_container)
622 {
623     TreeDbTerm *this;
624     DbTreeStack* stack;
625 
626     if (( this = root ) == NULL) {
627 	*ret = am_EOT;
628 	return DB_ERROR_NONE;
629     }
630     /* Walk down the tree to the right */
631     if ((stack = get_static_stack(stack_container)) != NULL) {
632 	stack->pos = stack->slot = 0;
633     }
634     while (this->right != NULL) {
635 	if (stack) PUSH_NODE(stack, this);
636 	this = this->right;
637     }
638     if (stack) {
639 	PUSH_NODE(stack, this);
640         /* Always centralized counters when static stack is used */
641 	stack->slot = NITEMS_CENTRALIZED(tbl);
642 	release_stack(tbl,stack_container,stack);
643     }
644     *ret = db_copy_key(p, tbl, &this->dbterm);
645     return DB_ERROR_NONE;
646 }
647 
db_last_tree(Process * p,DbTable * tbl,Eterm * ret)648 static int db_last_tree(Process *p, DbTable *tbl, Eterm *ret)
649 {
650     DbTableTree *tb = &tbl->tree;
651     return db_last_tree_common(p, tbl, tb->root, ret, tb);
652 }
653 
db_prev_tree_common(Process * p,DbTable * tbl,TreeDbTerm * root,Eterm key,Eterm * ret,DbTreeStack * stack)654 int db_prev_tree_common(Process *p, DbTable *tbl, TreeDbTerm *root, Eterm key,
655                  Eterm *ret, DbTreeStack* stack)
656 {
657     TreeDbTerm *this;
658 
659     if (key == am_EOT)
660 	return DB_ERROR_BADKEY;
661     this = find_prev(&tbl->common, root, stack, key);
662     if (this == NULL) {
663 	*ret = am_EOT;
664 	return DB_ERROR_NONE;
665     }
666     *ret = db_copy_key(p, tbl, &this->dbterm);
667     return DB_ERROR_NONE;
668 }
669 
db_prev_tree(Process * p,DbTable * tbl,Eterm key,Eterm * ret)670 static int db_prev_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
671 {
672     DbTableTree *tb = &tbl->tree;
673     DbTreeStack* stack = get_any_stack(tbl, tb);
674     int res = db_prev_tree_common(p, tbl, tb->root, key, ret, stack);
675     release_stack(tbl,tb,stack);
676     return res;
677 }
678 
cmp_key_eq(DbTableCommon * tb,Eterm key,TreeDbTerm * obj)679 static ERTS_INLINE int cmp_key_eq(DbTableCommon* tb, Eterm key, TreeDbTerm* obj) {
680     Eterm obj_key = GETKEY(tb,obj->dbterm.tpl);
681     return is_same(key, obj_key) || CMP(key, obj_key) == 0;
682 }
683 
684 /*
685  * This function differ to db_put_tree_common in that it inserts a TreeDbTerm
686  * instead of an Eterm.
687  */
db_put_dbterm_tree_common(DbTableCommon * tb,TreeDbTerm ** root,TreeDbTerm * value_to_insert,int key_clash_fail,DbTableTree * stack_container)688 int db_put_dbterm_tree_common(DbTableCommon *tb,
689                               TreeDbTerm **root,
690                               TreeDbTerm *value_to_insert,
691                               int key_clash_fail,
692                               DbTableTree *stack_container)
693 {
694     /* Non recursive insertion in AVL tree, building our own stack */
695     TreeDbTerm **tstack[STACK_NEED];
696     int tpos = 0;
697     int dstack[STACK_NEED+1];
698     int dpos = 0;
699     int state = 0;
700     TreeDbTerm **this = root;
701     Sint c;
702     Eterm key;
703     int dir;
704     TreeDbTerm *p1, *p2, *p;
705     Uint size_to_insert = db_term_size((DbTable*)tb, value_to_insert, offsetof(TreeDbTerm, dbterm));
706     ERTS_DB_ALC_MEM_UPDATE_((DbTable*)tb, 0, size_to_insert);
707     key = GETKEY(tb, value_to_insert->dbterm.tpl);
708 
709     reset_static_stack(stack_container);
710 
711     dstack[dpos++] = DIR_END;
712     for (;;)
713 	if (!*this) { /* Found our place */
714 	    state = 1;
715             INC_NITEMS(((DbTable*)tb));
716 	    *this = value_to_insert;
717 	    (*this)->balance = 0;
718 	    (*this)->left = (*this)->right = NULL;
719 	    break;
720 	} else if ((c = cmp_key(tb, key, *this)) < 0) {
721 	    /* go lefts */
722 	    dstack[dpos++] = DIR_LEFT;
723 	    tstack[tpos++] = this;
724 	    this = &((*this)->left);
725 	} else if (c > 0) { /* go right */
726 	    dstack[dpos++] = DIR_RIGHT;
727 	    tstack[tpos++] = this;
728 	    this = &((*this)->right);
729 	} else if (!key_clash_fail) { /* Equal key and this is a set, replace. */
730             value_to_insert->balance = (*this)->balance;
731             value_to_insert->left = (*this)->left;
732             value_to_insert->right = (*this)->right;
733             free_term((DbTable*)tb, *this);
734             *this = value_to_insert;
735 	    break;
736 	} else {
737 	    return DB_ERROR_BADKEY; /* key already exists */
738 	}
739 
740     while (state && ( dir = dstack[--dpos] ) != DIR_END) {
741 	this = tstack[--tpos];
742 	p = *this;
743 	if (dir == DIR_LEFT) {
744 	    switch (p->balance) {
745 	    case 1:
746 		p->balance = 0;
747 		state = 0;
748 		break;
749 	    case 0:
750 		p->balance = -1;
751 		break;
752 	    case -1: /* The icky case */
753 		p1 = p->left;
754 		if (p1->balance == -1) { /* Single LL rotation */
755 		    p->left = p1->right;
756 		    p1->right = p;
757 		    p->balance = 0;
758 		    (*this) = p1;
759 		} else { /* Double RR rotation */
760                     ASSERT(p1->right);
761 		    p2 = p1->right;
762 		    p1->right = p2->left;
763 		    p2->left = p1;
764 		    p->left = p2->right;
765 		    p2->right = p;
766 		    p->balance = (p2->balance == -1) ? +1 : 0;
767 		    p1->balance = (p2->balance == 1) ? -1 : 0;
768 		    (*this) = p2;
769 		}
770 		(*this)->balance = 0;
771 		state = 0;
772 		break;
773 	    }
774 	} else { /* dir == DIR_RIGHT */
775 	    switch (p->balance) {
776 	    case -1:
777 		p->balance = 0;
778 		state = 0;
779 		break;
780 	    case 0:
781 		p->balance = 1;
782 		break;
783 	    case 1:
784 		p1 = p->right;
785 		if (p1->balance == 1) { /* Single RR rotation */
786 		    p->right = p1->left;
787 		    p1->left = p;
788 		    p->balance = 0;
789 		    (*this) = p1;
790 		} else { /* Double RL rotation */
791                     ASSERT(p1->left);
792 		    p2 = p1->left;
793 		    p1->left = p2->right;
794 		    p2->right = p1;
795 		    p->right = p2->left;
796 		    p2->left = p;
797 		    p->balance = (p2->balance == 1) ? -1 : 0;
798 		    p1->balance = (p2->balance == -1) ? 1 : 0;
799 		    (*this) = p2;
800 		}
801 		(*this)->balance = 0;
802 		state = 0;
803 		break;
804 	    }
805 	}
806     }
807     return DB_ERROR_NONE;
808 }
809 
db_put_dbterm_tree(DbTable * tbl,void * obj,int key_clash_fail,SWord * consumed_reds_p)810 static int db_put_dbterm_tree(DbTable* tbl, /* [in out] */
811                               void* obj,
812                               int key_clash_fail, /* DB_ERROR_BADKEY if key exists */
813                               SWord *consumed_reds_p)
814 {
815     DbTableTree *tb = &tbl->tree;
816     return db_put_dbterm_tree_common(&tb->common, &tb->root, obj, key_clash_fail, tb);
817 }
818 
db_put_tree_common(DbTableCommon * tb,TreeDbTerm ** root,Eterm obj,int key_clash_fail,DbTableTree * stack_container)819 int db_put_tree_common(DbTableCommon *tb, TreeDbTerm **root, Eterm obj,
820                        int key_clash_fail, DbTableTree *stack_container)
821 {
822     /* Non recursive insertion in AVL tree, building our own stack */
823     TreeDbTerm **tstack[STACK_NEED];
824     int tpos = 0;
825     int dstack[STACK_NEED+1];
826     int dpos = 0;
827     int state = 0;
828     TreeDbTerm **this = root;
829     Sint c;
830     Eterm key;
831     int dir;
832     TreeDbTerm *p1, *p2, *p;
833 
834     key = GETKEY(tb, tuple_val(obj));
835 
836     reset_static_stack(stack_container);
837 
838     dstack[dpos++] = DIR_END;
839     for (;;)
840 	if (!*this) { /* Found our place */
841 	    state = 1;
842             INC_NITEMS(((DbTable*)tb));
843 	    *this = new_dbterm(tb, obj);
844 	    (*this)->balance = 0;
845 	    (*this)->left = (*this)->right = NULL;
846 	    break;
847 	} else if ((c = cmp_key(tb, key, *this)) < 0) {
848 	    /* go lefts */
849 	    dstack[dpos++] = DIR_LEFT;
850 	    tstack[tpos++] = this;
851 	    this = &((*this)->left);
852 	} else if (c > 0) { /* go right */
853 	    dstack[dpos++] = DIR_RIGHT;
854 	    tstack[tpos++] = this;
855 	    this = &((*this)->right);
856 	} else if (!key_clash_fail) { /* Equal key and this is a set, replace. */
857 	    *this = replace_dbterm(tb, *this, obj);
858 	    break;
859 	} else {
860 	    return DB_ERROR_BADKEY; /* key already exists */
861 	}
862 
863     while (state && ( dir = dstack[--dpos] ) != DIR_END) {
864 	this = tstack[--tpos];
865 	p = *this;
866 	if (dir == DIR_LEFT) {
867 	    switch (p->balance) {
868 	    case 1:
869 		p->balance = 0;
870 		state = 0;
871 		break;
872 	    case 0:
873 		p->balance = -1;
874 		break;
875 	    case -1: /* The icky case */
876 		p1 = p->left;
877 		if (p1->balance == -1) { /* Single LL rotation */
878 		    p->left = p1->right;
879 		    p1->right = p;
880 		    p->balance = 0;
881 		    (*this) = p1;
882 		} else { /* Double RR rotation */
883                     ASSERT(p1->right);
884 		    p2 = p1->right;
885 		    p1->right = p2->left;
886 		    p2->left = p1;
887 		    p->left = p2->right;
888 		    p2->right = p;
889 		    p->balance = (p2->balance == -1) ? +1 : 0;
890 		    p1->balance = (p2->balance == 1) ? -1 : 0;
891 		    (*this) = p2;
892 		}
893 		(*this)->balance = 0;
894 		state = 0;
895 		break;
896 	    }
897 	} else { /* dir == DIR_RIGHT */
898 	    switch (p->balance) {
899 	    case -1:
900 		p->balance = 0;
901 		state = 0;
902 		break;
903 	    case 0:
904 		p->balance = 1;
905 		break;
906 	    case 1:
907 		p1 = p->right;
908 		if (p1->balance == 1) { /* Single RR rotation */
909 		    p->right = p1->left;
910 		    p1->left = p;
911 		    p->balance = 0;
912 		    (*this) = p1;
913 		} else { /* Double RL rotation */
914                     ASSERT(p1->left);
915 		    p2 = p1->left;
916 		    p1->left = p2->right;
917 		    p2->right = p1;
918 		    p->right = p2->left;
919 		    p2->left = p;
920 		    p->balance = (p2->balance == 1) ? -1 : 0;
921 		    p1->balance = (p2->balance == -1) ? 1 : 0;
922 		    (*this) = p2;
923 		}
924 		(*this)->balance = 0;
925 		state = 0;
926 		break;
927 	    }
928 	}
929     }
930     return DB_ERROR_NONE;
931 }
932 
db_put_tree(DbTable * tbl,Eterm obj,int key_clash_fail,SWord * consumed_reds_p)933 static int db_put_tree(DbTable *tbl, Eterm obj, int key_clash_fail,
934                        SWord *consumed_reds_p)
935 {
936     DbTableTree *tb = &tbl->tree;
937     return db_put_tree_common(&tb->common, &tb->root, obj, key_clash_fail, tb);
938 }
939 
db_get_tree_common(Process * p,DbTableCommon * tb,TreeDbTerm * root,Eterm key,Eterm * ret,DbTableTree * stack_container)940 int db_get_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm *root, Eterm key,
941                        Eterm *ret, DbTableTree *stack_container)
942 {
943     Eterm copy;
944     Eterm *hp, *hend;
945     TreeDbTerm *this;
946 
947     /*
948      * This is always a set, so we know exactly how large
949      * the data is when we have found it.
950      * The list created around it is purely for interface conformance.
951      */
952 
953     this = find_node(tb,root,key,stack_container);
954     if (this == NULL) {
955 	*ret = NIL;
956     } else {
957 	hp = HAlloc(p, this->dbterm.size + 2);
958 	hend = hp + this->dbterm.size + 2;
959 	copy = db_copy_object_from_ets(tb, &this->dbterm, &hp, &MSO(p));
960 	*ret = CONS(hp, copy, NIL);
961 	hp += 2;
962 	HRelease(p,hend,hp);
963     }
964     return DB_ERROR_NONE;
965 }
966 
db_get_tree(Process * p,DbTable * tbl,Eterm key,Eterm * ret)967 static int db_get_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
968 {
969     DbTableTree *tb = &tbl->tree;
970     return db_get_tree_common(p, &tb->common, tb->root, key, ret, tb);
971 }
972 
db_member_tree_common(DbTableCommon * tb,TreeDbTerm * root,Eterm key,Eterm * ret,DbTableTree * stack_container)973 int db_member_tree_common(DbTableCommon *tb, TreeDbTerm *root, Eterm key, Eterm *ret,
974                           DbTableTree *stack_container)
975 {
976     *ret = (find_node(tb,root,key,stack_container) == NULL) ? am_false : am_true;
977     return DB_ERROR_NONE;
978 }
979 
db_member_tree(DbTable * tbl,Eterm key,Eterm * ret)980 static int db_member_tree(DbTable *tbl, Eterm key, Eterm *ret)
981 {
982     DbTableTree *tb = &tbl->tree;
983     return db_member_tree_common(&tb->common, tb->root, key, ret, tb);
984 }
985 
db_get_element_tree_common(Process * p,DbTableCommon * tb,TreeDbTerm * root,Eterm key,int ndex,Eterm * ret,DbTableTree * stack_container)986 int db_get_element_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm *root, Eterm key,
987                                int ndex, Eterm *ret, DbTableTree *stack_container)
988 {
989     /*
990      * Look the node up:
991      */
992     Eterm *hp;
993     TreeDbTerm *this;
994 
995     /*
996      * This is always a set, so we know exactly how large
997      * the data is when we have found it.
998      * No list is created around elements in set's so there are no list
999      * around the element here either.
1000      */
1001 
1002     this = find_node(tb,root,key,stack_container);
1003     if (this == NULL) {
1004 	return DB_ERROR_BADKEY;
1005     } else {
1006 	if (ndex > arityval(this->dbterm.tpl[0])) {
1007 	    return DB_ERROR_BADPARAM;
1008 	}
1009 	*ret = db_copy_element_from_ets(tb, p, &this->dbterm, ndex, &hp, 0);
1010     }
1011     return DB_ERROR_NONE;
1012 }
1013 
db_get_element_tree(Process * p,DbTable * tbl,Eterm key,int ndex,Eterm * ret)1014 static int db_get_element_tree(Process *p, DbTable *tbl,
1015 			       Eterm key, int ndex, Eterm *ret)
1016 {
1017     DbTableTree *tb = &tbl->tree;
1018     return db_get_element_tree_common(p, &tb->common, tb->root, key,
1019                                       ndex, ret, tb);
1020 }
1021 
db_erase_tree_common(DbTable * tbl,TreeDbTerm ** root,Eterm key,Eterm * ret,DbTreeStack * stack)1022 int db_erase_tree_common(DbTable *tbl, TreeDbTerm **root, Eterm key, Eterm *ret,
1023                          DbTreeStack *stack /* NULL if no static stack */)
1024 {
1025     TreeDbTerm *res;
1026 
1027     *ret = am_true;
1028 
1029     if ((res = linkout_tree(&tbl->common, root,key, stack)) != NULL) {
1030 	free_term(tbl, res);
1031     }
1032     return DB_ERROR_NONE;
1033 }
1034 
db_erase_tree(DbTable * tbl,Eterm key,Eterm * ret)1035 static int db_erase_tree(DbTable *tbl, Eterm key, Eterm *ret)
1036 {
1037     DbTableTree *tb = &tbl->tree;
1038     return db_erase_tree_common(tbl, &tb->root, key, ret, &tb->static_stack);
1039 }
1040 
db_erase_object_tree_common(DbTable * tbl,TreeDbTerm ** root,Eterm object,Eterm * ret,DbTableTree * stack_container)1041 int db_erase_object_tree_common(DbTable *tbl, TreeDbTerm **root, Eterm object,
1042                                 Eterm *ret, DbTableTree *stack_container)
1043 {
1044     TreeDbTerm *res;
1045 
1046     *ret = am_true;
1047 
1048     if ((res = linkout_object_tree(&tbl->common, root, object, stack_container)) != NULL) {
1049 	free_term(tbl, res);
1050     }
1051     return DB_ERROR_NONE;
1052 }
1053 
db_erase_object_tree(DbTable * tbl,Eterm object,Eterm * ret)1054 static int db_erase_object_tree(DbTable *tbl, Eterm object, Eterm *ret)
1055 {
1056     DbTableTree *tb = &tbl->tree;
1057     return  db_erase_object_tree_common(tbl, &tb->root, object, ret, tb);
1058 }
1059 
db_slot_tree_common(Process * p,DbTable * tbl,TreeDbTerm * root,Eterm slot_term,Eterm * ret,DbTableTree * stack_container,CATreeRootIterator * iter)1060 int db_slot_tree_common(Process *p, DbTable *tbl, TreeDbTerm *root,
1061                         Eterm slot_term, Eterm *ret,
1062                         DbTableTree *stack_container,
1063                         CATreeRootIterator *iter)
1064 {
1065     Sint slot;
1066     TreeDbTerm *st;
1067     Eterm *hp, *hend;
1068     Eterm copy;
1069     int is_EOT = 0;
1070     /*
1071      * The notion of a "slot" is not natural in a tree, but we try to
1072      * simulate it by giving the n'th node in the tree instead.
1073      * Traversing a tree in this way is not very convenient, but by
1074      * using the saved stack we at least sometimes will get acceptable
1075      * performance.
1076      */
1077 
1078     if (is_not_small(slot_term) ||
1079 	((slot = signed_val(slot_term)) < 0) ||
1080 	(IS_CENTRALIZED_CTR(tbl) && slot > NITEMS_CENTRALIZED(tbl)))
1081 	return DB_ERROR_BADPARAM;
1082 
1083     if (IS_CENTRALIZED_CTR(tbl) && slot == NITEMS_CENTRALIZED(tbl)) {
1084 	*ret = am_EOT;
1085 	return DB_ERROR_NONE;
1086     }
1087 
1088     /*
1089      * We use the slot position and search from there, slot positions
1090      * are counted from 1 and up.
1091      */
1092     ++slot;
1093     st = slot_search(p, root, slot, tbl, stack_container, iter, &is_EOT);
1094     if (is_EOT) {
1095         *ret = am_EOT;
1096 	return DB_ERROR_NONE;
1097     }
1098     if (st == NULL) {
1099 	*ret = am_false;
1100 	return DB_ERROR_UNSPEC;
1101     }
1102     hp = HAlloc(p, st->dbterm.size + 2);
1103     hend = hp + st->dbterm.size + 2;
1104     copy = db_copy_object_from_ets(&tbl->common, &st->dbterm, &hp, &MSO(p));
1105     *ret = CONS(hp, copy, NIL);
1106     hp += 2;
1107     HRelease(p,hend,hp);
1108     return DB_ERROR_NONE;
1109 }
1110 
db_slot_tree(Process * p,DbTable * tbl,Eterm slot_term,Eterm * ret)1111 static int db_slot_tree(Process *p, DbTable *tbl,
1112 			Eterm slot_term, Eterm *ret)
1113 {
1114     DbTableTree *tb = &tbl->tree;
1115     return db_slot_tree_common(p, tbl, tb->root, slot_term, ret, tb, NULL);
1116 }
1117 
1118 
1119 
ets_select_reverse(BIF_ALIST_3)1120 static BIF_RETTYPE ets_select_reverse(BIF_ALIST_3)
1121 {
1122     Process *p = BIF_P;
1123     Eterm a1 = BIF_ARG_1;
1124     Eterm a2 = BIF_ARG_2;
1125     Eterm a3 = BIF_ARG_3;
1126     Eterm list;
1127     Eterm result;
1128     Eterm* hp;
1129     Eterm* hend;
1130 
1131     int max_iter = CONTEXT_REDS * 10;
1132 
1133     if (is_nil(a1)) {
1134 	hp = HAlloc(p, 3);
1135 	BIF_RET(TUPLE2(hp,a2,a3));
1136     } else if (is_not_list(a1)) {
1137     error:
1138 	BIF_ERROR(p, BADARG);
1139     }
1140 
1141     list = a1;
1142     result = a2;
1143     hp = hend = NULL;
1144     while (is_list(list)) {
1145 	Eterm* pair = list_val(list);
1146 	if (--max_iter == 0) {
1147 	    BUMP_ALL_REDS(p);
1148 	    HRelease(p, hend, hp);
1149 	    BIF_TRAP3(&ets_select_reverse_exp, p, list, result, a3);
1150 	}
1151 	if (hp == hend) {
1152 	    hp = HAlloc(p, 64);
1153 	    hend = hp + 64;
1154 	}
1155 	result = CONS(hp, CAR(pair), result);
1156 	hp += 2;
1157 	list = CDR(pair);
1158     }
1159     if (is_not_nil(list))  {
1160 	goto error;
1161     }
1162     HRelease(p, hend, hp);
1163     BUMP_REDS(p,CONTEXT_REDS - max_iter / 10);
1164     hp = HAlloc(p,3);
1165     BIF_RET(TUPLE2(hp, result, a3));
1166 }
1167 
bif_trap1(Export * bif,Process * p,Eterm p1)1168 static BIF_RETTYPE bif_trap1(Export *bif,
1169 			     Process *p,
1170 			     Eterm p1)
1171 {
1172     BIF_TRAP1(bif, p, p1);
1173 }
1174 
bif_trap3(Export * bif,Process * p,Eterm p1,Eterm p2,Eterm p3)1175 static BIF_RETTYPE bif_trap3(Export *bif,
1176 			     Process *p,
1177 			     Eterm p1,
1178 			     Eterm p2,
1179 			     Eterm p3)
1180 {
1181     BIF_TRAP3(bif, p, p1, p2, p3);
1182 }
1183 
db_select_continue_tree_common(Process * p,DbTableCommon * tb,Eterm continuation,Eterm * ret,DbTableTree * stack_container,CATreeRootIterator * iter)1184 int db_select_continue_tree_common(Process *p,
1185                                    DbTableCommon *tb,
1186                                    Eterm continuation,
1187                                    Eterm *ret,
1188                                    DbTableTree *stack_container,
1189                                    CATreeRootIterator* iter)
1190 {
1191     DbTreeStack* stack;
1192     struct select_context sc;
1193     unsigned sz;
1194     Eterm *hp;
1195     Eterm lastkey;
1196     Eterm end_condition;
1197     Binary *mp;
1198     Eterm key;
1199     Eterm *tptr;
1200     Sint chunk_size;
1201     Sint reverse;
1202 
1203 #define RET_TO_BIF(Term, State) do { *ret = (Term); return State; } while(0);
1204 
1205     /* Decode continuation. We know it's a tuple but not the arity or
1206        anything else */
1207 
1208     tptr = tuple_val(continuation);
1209 
1210     if (arityval(*tptr) != 8)
1211 	RET_TO_BIF(NIL,DB_ERROR_BADPARAM);
1212 
1213     if (!is_small(tptr[4]) ||
1214 	!(is_list(tptr[6]) || tptr[6] == NIL) || !is_small(tptr[7]) ||
1215 	!is_small(tptr[8]))
1216 	RET_TO_BIF(NIL,DB_ERROR_BADPARAM);
1217 
1218     lastkey = tptr[2];
1219     end_condition = tptr[3];
1220     mp = erts_db_get_match_prog_binary(tptr[5]);
1221     if (!mp)
1222 	RET_TO_BIF(NIL,DB_ERROR_BADPARAM);
1223     chunk_size = signed_val(tptr[4]);
1224 
1225     sc.p = p;
1226     sc.accum = tptr[6];
1227     sc.mp = mp;
1228     sc.end_condition = NIL;
1229     sc.lastobj = NULL;
1230     sc.max = 1000;
1231     sc.keypos = tb->keypos;
1232     sc.chunk_size = chunk_size;
1233     reverse = unsigned_val(tptr[7]);
1234     sc.got = signed_val(tptr[8]);
1235 
1236     if (iter) {
1237         iter->next_route_key = lastkey;
1238         sc.common.root = catree_find_nextprev_root(iter, !!reverse != !!chunk_size, NULL);
1239     }
1240     else
1241         sc.common.root = &((DbTableTree*)tb)->root;
1242 
1243     if (sc.common.root) {
1244         stack = get_any_stack((DbTable*)tb, stack_container);
1245         if (chunk_size) {
1246             if (reverse) {
1247                 traverse_backwards(tb, stack, lastkey, &doit_select_chunk, &sc.common, iter);
1248             } else {
1249                 traverse_forward(tb, stack, lastkey, &doit_select_chunk, &sc.common, iter);
1250             }
1251         } else {
1252             if (reverse) {
1253                 traverse_forward(tb, stack, lastkey, &doit_select, &sc.common, iter);
1254             } else {
1255                 traverse_backwards(tb, stack, lastkey, &doit_select, &sc.common, iter);
1256             }
1257         }
1258         release_stack((DbTable*)tb,stack_container,stack);
1259 
1260         BUMP_REDS(p, 1000 - sc.max);
1261     }
1262 
1263     if (sc.max > 0 || (chunk_size && sc.got == chunk_size)) {
1264 	if (chunk_size) {
1265 	    Eterm *hp;
1266 	    unsigned sz;
1267 
1268 	    if (sc.got < chunk_size || sc.lastobj == NULL) {
1269 		/* end of table, sc.lastobj may be NULL as we may have been
1270 		   at the very last object in the table when trapping. */
1271 		if (!sc.got) {
1272 		    RET_TO_BIF(am_EOT, DB_ERROR_NONE);
1273 		} else {
1274 		    RET_TO_BIF(bif_trap3(&ets_select_reverse_exp, p,
1275 					 sc.accum, NIL, am_EOT),
1276 			       DB_ERROR_NONE);
1277 		}
1278 	    }
1279 
1280 	    key = GETKEY(tb, sc.lastobj);
1281 	    sz = size_object(key);
1282 	    hp = HAlloc(p, 9 + sz);
1283 	    key = copy_struct(key, sz, &hp, &MSO(p));
1284 	    continuation = TUPLE8
1285 		(hp,
1286 		 tptr[1],
1287 		 key,
1288 		 tptr[3],
1289 		 tptr[4],
1290 		 tptr[5],
1291 		 NIL,
1292 		 tptr[7],
1293 		 make_small(0));
1294 	    RET_TO_BIF(bif_trap3(&ets_select_reverse_exp, p,
1295 				 sc.accum, NIL, continuation),
1296 		       DB_ERROR_NONE);
1297 	} else {
1298 	    RET_TO_BIF(sc.accum, DB_ERROR_NONE);
1299 	}
1300     }
1301     key = GETKEY(tb, sc.lastobj);
1302     if (chunk_size) {
1303 	if (end_condition != NIL &&
1304 	    ((!reverse && cmp_partly_bound(end_condition,key) < 0) ||
1305 	     (reverse && cmp_partly_bound(end_condition,key) > 0))) {
1306 	    /* done anyway */
1307 	    if (!sc.got) {
1308 		RET_TO_BIF(am_EOT, DB_ERROR_NONE);
1309 	    } else {
1310 		RET_TO_BIF(bif_trap3(&ets_select_reverse_exp, p,
1311 				     sc.accum, NIL, am_EOT),
1312 			   DB_ERROR_NONE);
1313 	    }
1314 	}
1315     } else {
1316 	if (end_condition != NIL &&
1317 	    ((!reverse && cmp_partly_bound(end_condition,key) > 0) ||
1318 	     (reverse && cmp_partly_bound(end_condition,key) < 0))) {
1319 	    /* done anyway */
1320 	    RET_TO_BIF(sc.accum,DB_ERROR_NONE);
1321 	}
1322     }
1323     /* Not done yet, let's trap. */
1324     sz = size_object(key);
1325     hp = HAlloc(p, 9 + sz);
1326     key = copy_struct(key, sz, &hp, &MSO(p));
1327     continuation = TUPLE8
1328 	(hp,
1329 	 tptr[1],
1330 	 key,
1331 	 tptr[3],
1332 	 tptr[4],
1333 	 tptr[5],
1334 	 sc.accum,
1335 	 tptr[7],
1336 	 make_small(sc.got));
1337     RET_TO_BIF(bif_trap1(BIF_TRAP_EXPORT(BIF_ets_select_1), p, continuation),
1338 	       DB_ERROR_NONE);
1339 
1340 #undef RET_TO_BIF
1341 }
1342 
1343 /*
1344 ** This is called either when the select bif traps or when ets:select/1
1345 ** is called. It does mostly the same as db_select_tree and may in either case
1346 ** trap to itself again (via the ets:select/1 bif).
1347 ** Note that this is common for db_select_tree and db_select_chunk_tree.
1348 */
db_select_continue_tree(Process * p,DbTable * tbl,Eterm continuation,Eterm * ret,enum DbIterSafety * safety_p)1349 static int db_select_continue_tree(Process *p,
1350 				   DbTable *tbl,
1351 				   Eterm continuation,
1352 				   Eterm *ret,
1353                                    enum DbIterSafety* safety_p)
1354 {
1355     DbTableTree *tb = &tbl->tree;
1356     return db_select_continue_tree_common(p, &tb->common,
1357                                           continuation, ret, tb, NULL);
1358 }
1359 
db_select_tree_common(Process * p,DbTable * tb,Eterm tid,Eterm pattern,int reverse,Eterm * ret,DbTableTree * stack_container,CATreeRootIterator * iter)1360 int db_select_tree_common(Process *p, DbTable *tb,
1361                           Eterm tid, Eterm pattern, int reverse, Eterm *ret,
1362                           DbTableTree *stack_container,
1363                           CATreeRootIterator* iter)
1364 {
1365     /* Strategy: Traverse backwards to build resulting list from tail to head */
1366     DbTreeStack* stack;
1367     struct select_context sc;
1368     struct mp_info mpi;
1369     Eterm lastkey = THE_NON_VALUE;
1370     Eterm key;
1371     Eterm continuation;
1372     unsigned sz;
1373     Eterm *hp;
1374     TreeDbTerm *this;
1375     int errcode;
1376     Eterm mpb;
1377 
1378 
1379 #define RET_TO_BIF(Term,RetVal) do { 	       	\
1380 	if (mpi.mp != NULL) {			\
1381 	    erts_bin_free(mpi.mp);       	\
1382 	}					\
1383 	*ret = (Term); 				\
1384 	return RetVal; 			        \
1385     } while(0)
1386 
1387     mpi.mp = NULL;
1388 
1389     sc.accum = NIL;
1390     sc.lastobj = NULL;
1391     sc.p = p;
1392     sc.max = 1000;
1393     sc.end_condition = NIL;
1394     sc.keypos = tb->common.keypos;
1395     sc.got = 0;
1396     sc.chunk_size = 0;
1397 
1398     if ((errcode = analyze_pattern(&tb->common, pattern, NULL, &mpi)) != DB_ERROR_NONE) {
1399 	RET_TO_BIF(NIL,errcode);
1400     }
1401 
1402     if (mpi.key_boundness == MS_KEY_IMPOSSIBLE) {
1403 	RET_TO_BIF(NIL,DB_ERROR_NONE);
1404 	/* can't possibly match anything */
1405     }
1406 
1407     sc.mp = mpi.mp;
1408 
1409     if (mpi.key_boundness == MS_KEY_BOUND) {
1410         ASSERT(CMP_EQ(mpi.least, mpi.most));
1411         if (iter)
1412             sc.common.root = catree_find_root(mpi.least, iter);
1413         else
1414             sc.common.root = &tb->tree.root;
1415         this = find_node(&tb->common, *sc.common.root, mpi.least, NULL);
1416         if (this)
1417             doit_select(&tb->common, this, &sc.common, 0 /* direction doesn't matter */);
1418 	RET_TO_BIF(sc.accum,DB_ERROR_NONE);
1419     }
1420 
1421     stack = get_any_stack((DbTable*)tb,stack_container);
1422     if (reverse) {
1423 	if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) {
1424             this = find_prev_from_pb_key(tb, &sc.common.root, stack, mpi.least, iter);
1425 	    if (this)
1426 		lastkey = GETKEY(tb, this->dbterm.tpl);
1427 	    sc.end_condition = mpi.most;
1428 	}
1429         else {
1430             ASSERT(mpi.key_boundness == MS_KEY_UNBOUND);
1431             if (iter)
1432                 sc.common.root = catree_find_first_root(iter);
1433             else
1434                 sc.common.root = &tb->tree.root;
1435         }
1436 	traverse_forward(&tb->common, stack, lastkey, &doit_select, &sc.common, iter);
1437     } else {
1438 	if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) {
1439             this = find_next_from_pb_key(tb, &sc.common.root, stack, mpi.most, iter);
1440 	    if (this)
1441                 lastkey = GETKEY(tb, this->dbterm.tpl);
1442 	    sc.end_condition = mpi.least;
1443 	}
1444         else {
1445             ASSERT(mpi.key_boundness == MS_KEY_UNBOUND);
1446             if (iter)
1447                 sc.common.root = catree_find_last_root(iter);
1448             else
1449                 sc.common.root = &tb->tree.root;
1450         }
1451 	traverse_backwards(&tb->common, stack, lastkey, &doit_select, &sc.common, iter);
1452     }
1453     release_stack((DbTable*)tb,stack_container,stack);
1454 #ifdef HARDDEBUG
1455 	erts_fprintf(stderr,"Least: %T\n", mpi.least);
1456 	erts_fprintf(stderr,"Most: %T\n", mpi.most);
1457 #endif
1458     BUMP_REDS(p, 1000 - sc.max);
1459     if (sc.max > 0) {
1460 	RET_TO_BIF(sc.accum,DB_ERROR_NONE);
1461     }
1462 
1463     key = GETKEY(tb, sc.lastobj);
1464     sz = size_object(key);
1465     hp = HAlloc(p, 9 + sz + ERTS_MAGIC_REF_THING_SIZE);
1466     key = copy_struct(key, sz, &hp, &MSO(p));
1467     mpb= erts_db_make_match_prog_ref(p,mpi.mp,&hp);
1468 
1469     continuation = TUPLE8
1470 	(hp,
1471 	 tid,
1472 	 key,
1473 	 sc.end_condition, /* From the match program, needn't be copied */
1474 	 make_small(0), /* Chunk size of zero means not chunked to the
1475 			   continuation BIF */
1476 	 mpb,
1477 	 sc.accum,
1478 	 make_small(reverse),
1479 	 make_small(sc.got));
1480 
1481     /* Don't free mpi.mp, so don't use macro */
1482     *ret = bif_trap1(BIF_TRAP_EXPORT(BIF_ets_select_1), p, continuation);
1483     return DB_ERROR_NONE;
1484 
1485 #undef RET_TO_BIF
1486 
1487 }
1488 
db_select_tree(Process * p,DbTable * tbl,Eterm tid,Eterm pattern,int reverse,Eterm * ret,enum DbIterSafety safety)1489 static int db_select_tree(Process *p, DbTable *tbl, Eterm tid,
1490 			  Eterm pattern, int reverse, Eterm *ret,
1491                           enum DbIterSafety safety)
1492 {
1493     return db_select_tree_common(p, tbl, tid,
1494                                  pattern, reverse, ret, &tbl->tree, NULL);
1495 }
1496 
db_select_count_continue_tree_common(Process * p,DbTable * tb,Eterm continuation,Eterm * ret,DbTableTree * stack_container,CATreeRootIterator * iter)1497 int db_select_count_continue_tree_common(Process *p,
1498                                          DbTable *tb,
1499                                          Eterm continuation,
1500                                          Eterm *ret,
1501                                          DbTableTree *stack_container,
1502                                          CATreeRootIterator* iter)
1503 {
1504     DbTreeStack* stack;
1505     struct select_count_context sc;
1506     unsigned sz;
1507     Eterm *hp;
1508     Eterm lastkey;
1509     Eterm end_condition;
1510     Binary *mp;
1511     Eterm key;
1512     Eterm *tptr;
1513     Eterm egot;
1514 
1515 #define RET_TO_BIF(Term, State) do { *ret = (Term); return State; } while(0);
1516 
1517     /* Decode continuation. We know it's a tuple and everything else as
1518      this is only called by ourselves */
1519 
1520     /* continuation:
1521        {Table, Lastkey, EndCondition, MatchProgBin, HowManyGot}*/
1522 
1523     tptr = tuple_val(continuation);
1524 
1525     if (arityval(*tptr) != 5)
1526 	erts_exit(ERTS_ERROR_EXIT,"Internal error in ets:select_count/1");
1527 
1528     lastkey = tptr[2];
1529     end_condition = tptr[3];
1530     mp = erts_db_get_match_prog_binary(tptr[4]);
1531     if (!mp)
1532 	RET_TO_BIF(NIL,DB_ERROR_BADPARAM);
1533 
1534     sc.p = p;
1535     sc.mp = mp;
1536     sc.end_condition = NIL;
1537     sc.lastobj = NULL;
1538     sc.max = 1000;
1539     sc.keypos = tb->common.keypos;
1540     if (is_big(tptr[5])) {
1541 	sc.got = big_to_uint32(tptr[5]);
1542     } else {
1543 	sc.got = unsigned_val(tptr[5]);
1544     }
1545 
1546     if (iter) {
1547         iter->next_route_key = lastkey;
1548         sc.common.root = catree_find_prev_root(iter, NULL);
1549     }
1550     else {
1551         sc.common.root = &tb->tree.root;
1552     }
1553 
1554     if (sc.common.root) {
1555         stack = get_any_stack(tb, stack_container);
1556         traverse_backwards(&tb->common, stack, lastkey, &doit_select_count, &sc.common, iter);
1557         release_stack(tb,stack_container,stack);
1558 
1559         BUMP_REDS(p, 1000 - sc.max);
1560     }
1561 
1562     if (sc.max > 0) {
1563 	RET_TO_BIF(erts_make_integer(sc.got,p), DB_ERROR_NONE);
1564     }
1565     key = GETKEY(tb, sc.lastobj);
1566     if (end_condition != NIL &&
1567 	(cmp_partly_bound(end_condition,key) > 0)) {
1568 	/* done anyway */
1569 	RET_TO_BIF(make_small(sc.got),DB_ERROR_NONE);
1570     }
1571     /* Not done yet, let's trap. */
1572     sz = size_object(key);
1573     if (IS_USMALL(0, sc.got)) {
1574 	hp = HAlloc(p, sz + 6);
1575 	egot = make_small(sc.got);
1576     }
1577     else {
1578 	hp = HAlloc(p, BIG_UINT_HEAP_SIZE + sz + 6);
1579 	egot = uint_to_big(sc.got, hp);
1580 	hp += BIG_UINT_HEAP_SIZE;
1581     }
1582     key = copy_struct(key, sz, &hp, &MSO(p));
1583     continuation = TUPLE5
1584 	(hp,
1585 	 tptr[1],
1586 	 key,
1587 	 tptr[3],
1588 	 tptr[4],
1589 	 egot);
1590     RET_TO_BIF(bif_trap1(&ets_select_count_continue_exp, p, continuation),
1591 	       DB_ERROR_NONE);
1592 
1593 #undef RET_TO_BIF
1594 }
1595 
1596 /*
1597 ** This is called either when the select_count bif traps.
1598 */
db_select_count_continue_tree(Process * p,DbTable * tbl,Eterm continuation,Eterm * ret,enum DbIterSafety * safety_p)1599 static int db_select_count_continue_tree(Process *p,
1600                                          DbTable *tbl,
1601                                          Eterm continuation,
1602                                          Eterm *ret,
1603                                          enum DbIterSafety* safety_p)
1604 {
1605     DbTableTree *tb = &tbl->tree;
1606     return db_select_count_continue_tree_common(p, tbl,
1607                                                 continuation, ret, tb, NULL);
1608 }
1609 
1610 
db_select_count_tree_common(Process * p,DbTable * tb,Eterm tid,Eterm pattern,Eterm * ret,DbTableTree * stack_container,CATreeRootIterator * iter)1611 int db_select_count_tree_common(Process *p, DbTable *tb,
1612                                 Eterm tid, Eterm pattern, Eterm *ret,
1613                                 DbTableTree *stack_container,
1614                                 CATreeRootIterator* iter)
1615 {
1616     DbTreeStack* stack;
1617     struct select_count_context sc;
1618     struct mp_info mpi;
1619     Eterm lastkey = THE_NON_VALUE;
1620     Eterm key;
1621     Eterm continuation;
1622     unsigned sz;
1623     Eterm *hp;
1624     TreeDbTerm *this;
1625     int errcode;
1626     Eterm egot;
1627     Eterm mpb;
1628 
1629 #define RET_TO_BIF(Term,RetVal) do { 	       	\
1630 	if (mpi.mp != NULL) {			\
1631 	    erts_bin_free(mpi.mp);       	\
1632 	}					\
1633 	*ret = (Term); 				\
1634 	return RetVal; 			        \
1635     } while(0)
1636 
1637     mpi.mp = NULL;
1638 
1639     sc.lastobj = NULL;
1640     sc.p = p;
1641     sc.max = 1000;
1642     sc.end_condition = NIL;
1643     sc.keypos = tb->common.keypos;
1644     sc.got = 0;
1645 
1646     if ((errcode = analyze_pattern(&tb->common, pattern, NULL, &mpi)) != DB_ERROR_NONE) {
1647 	RET_TO_BIF(NIL,errcode);
1648     }
1649 
1650     if (mpi.key_boundness == MS_KEY_IMPOSSIBLE) {
1651 	RET_TO_BIF(make_small(0),DB_ERROR_NONE);
1652 	/* can't possibly match anything */
1653     }
1654 
1655     sc.mp = mpi.mp;
1656 
1657     if (mpi.key_boundness == MS_KEY_BOUND) {
1658         ASSERT(CMP_EQ(mpi.least, mpi.most));
1659         if (iter)
1660             sc.common.root = catree_find_root(mpi.least, iter);
1661         else
1662             sc.common.root = &((DbTable*)tb)->tree.root;
1663         this =  find_node(&tb->common, *sc.common.root, mpi.least, NULL);
1664         if (this)
1665             doit_select_count(&tb->common, this, &sc.common, 0 /* dummy */);
1666 	RET_TO_BIF(erts_make_integer(sc.got,p),DB_ERROR_NONE);
1667     }
1668 
1669     stack = get_any_stack((DbTable*)tb, stack_container);
1670     if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) {
1671         this = find_next_from_pb_key(tb, &sc.common.root, stack, mpi.most, iter);
1672 	if (this)
1673             lastkey = GETKEY(tb, this->dbterm.tpl);
1674 	sc.end_condition = mpi.least;
1675     }
1676     else {
1677         ASSERT(mpi.key_boundness == MS_KEY_UNBOUND);
1678         if (iter)
1679             sc.common.root = catree_find_last_root(iter);
1680         else
1681             sc.common.root = &tb->tree.root;
1682     }
1683 
1684     traverse_backwards(&tb->common, stack, lastkey, &doit_select_count, &sc.common, iter);
1685     release_stack((DbTable*)tb,stack_container,stack);
1686     BUMP_REDS(p, 1000 - sc.max);
1687     if (sc.max > 0) {
1688 	RET_TO_BIF(erts_make_integer(sc.got,p),DB_ERROR_NONE);
1689     }
1690 
1691     key = GETKEY(tb, sc.lastobj);
1692     sz = size_object(key);
1693     if (IS_USMALL(0, sc.got)) {
1694 	hp = HAlloc(p, sz + ERTS_MAGIC_REF_THING_SIZE + 6);
1695 	egot = make_small(sc.got);
1696     }
1697     else {
1698 	hp = HAlloc(p, BIG_UINT_HEAP_SIZE + sz + ERTS_MAGIC_REF_THING_SIZE + 6);
1699 	egot = uint_to_big(sc.got, hp);
1700 	hp += BIG_UINT_HEAP_SIZE;
1701     }
1702     key = copy_struct(key, sz, &hp, &MSO(p));
1703     mpb = erts_db_make_match_prog_ref(p,mpi.mp,&hp);
1704 
1705     continuation = TUPLE5
1706 	(hp,
1707 	 tid,
1708 	 key,
1709 	 sc.end_condition, /* From the match program, needn't be copied */
1710 	 mpb,
1711 	 egot);
1712 
1713     /* Don't free mpi.mp, so don't use macro */
1714     *ret = bif_trap1(&ets_select_count_continue_exp, p, continuation);
1715     return DB_ERROR_NONE;
1716 
1717 #undef RET_TO_BIF
1718 
1719 }
1720 
db_select_count_tree(Process * p,DbTable * tbl,Eterm tid,Eterm pattern,Eterm * ret,enum DbIterSafety safety)1721 static int db_select_count_tree(Process *p, DbTable *tbl, Eterm tid,
1722                                 Eterm pattern, Eterm *ret,
1723                                 enum DbIterSafety safety)
1724 {
1725     DbTableTree *tb = &tbl->tree;
1726     return db_select_count_tree_common(p, tbl,
1727                                        tid, pattern, ret, tb, NULL);
1728 }
1729 
1730 
db_select_chunk_tree_common(Process * p,DbTable * tb,Eterm tid,Eterm pattern,Sint chunk_size,int reverse,Eterm * ret,DbTableTree * stack_container,CATreeRootIterator * iter)1731 int db_select_chunk_tree_common(Process *p, DbTable *tb,
1732                                 Eterm tid, Eterm pattern, Sint chunk_size,
1733                                 int reverse, Eterm *ret,
1734                                 DbTableTree *stack_container,
1735                                 CATreeRootIterator* iter)
1736 {
1737     DbTreeStack* stack;
1738     struct select_context sc;
1739     struct mp_info mpi;
1740     Eterm lastkey = THE_NON_VALUE;
1741     Eterm key;
1742     Eterm continuation;
1743     unsigned sz;
1744     Eterm *hp;
1745     TreeDbTerm *this;
1746     int errcode;
1747     Eterm mpb;
1748 
1749 #define RET_TO_BIF(Term,RetVal) do { 		\
1750 	if (mpi.mp != NULL) {			\
1751 	    erts_bin_free(mpi.mp);		\
1752 	}					\
1753 	*ret = (Term); 				\
1754 	return RetVal; 			        \
1755     } while(0)
1756 
1757     mpi.mp = NULL;
1758 
1759     sc.accum = NIL;
1760     sc.lastobj = NULL;
1761     sc.p = p;
1762     sc.max = 1000;
1763     sc.end_condition = NIL;
1764     sc.keypos = tb->common.keypos;
1765     sc.got = 0;
1766     sc.chunk_size = chunk_size;
1767 
1768     if ((errcode = analyze_pattern(&tb->common, pattern, NULL, &mpi)) != DB_ERROR_NONE) {
1769 	RET_TO_BIF(NIL,errcode);
1770     }
1771 
1772     if (mpi.key_boundness == MS_KEY_IMPOSSIBLE) {
1773 	RET_TO_BIF(am_EOT,DB_ERROR_NONE);
1774 	/* can't possibly match anything */
1775     }
1776 
1777     sc.mp = mpi.mp;
1778 
1779     if (mpi.key_boundness == MS_KEY_BOUND) {
1780         ASSERT(CMP_EQ(mpi.least, mpi.most));
1781         if (iter)
1782             sc.common.root = catree_find_root(mpi.least, iter);
1783         else
1784             sc.common.root = &tb->tree.root;
1785         this =  find_node(&tb->common, *sc.common.root, mpi.least, NULL);
1786         if (this)
1787             doit_select(&tb->common, this, &sc.common, 0 /* direction doesn't matter */);
1788 	if (sc.accum != NIL) {
1789 	    hp=HAlloc(p, 3);
1790 	    RET_TO_BIF(TUPLE2(hp,sc.accum,am_EOT),DB_ERROR_NONE);
1791 	} else {
1792 	    RET_TO_BIF(am_EOT,DB_ERROR_NONE);
1793 	}
1794     }
1795 
1796     stack = get_any_stack((DbTable*)tb,stack_container);
1797     if (reverse) {
1798 	if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) {
1799             this = find_next_from_pb_key(tb, &sc.common.root, stack, mpi.most, iter);
1800 	    if (this)
1801                 lastkey = GETKEY(tb, this->dbterm.tpl);
1802 	    sc.end_condition = mpi.least;
1803 	}
1804         else {
1805             ASSERT(mpi.key_boundness == MS_KEY_UNBOUND);
1806             if (iter)
1807                 sc.common.root = catree_find_last_root(iter);
1808             else
1809                 sc.common.root = &tb->tree.root;
1810         }
1811 	traverse_backwards(&tb->common, stack, lastkey, &doit_select_chunk, &sc.common, iter);
1812     } else {
1813 	if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) {
1814             this = find_prev_from_pb_key(tb, &sc.common.root, stack, mpi.least, iter);
1815 	    if (this)
1816                 lastkey = GETKEY(tb, this->dbterm.tpl);
1817 	    sc.end_condition = mpi.most;
1818 	}
1819         else {
1820             ASSERT(mpi.key_boundness == MS_KEY_UNBOUND);
1821             if (iter)
1822                 sc.common.root = catree_find_first_root(iter);
1823             else
1824                 sc.common.root = &tb->tree.root;
1825         }
1826 	traverse_forward(&tb->common, stack, lastkey, &doit_select_chunk, &sc.common, iter);
1827     }
1828     release_stack((DbTable*)tb,stack_container,stack);
1829 
1830     BUMP_REDS(p, 1000 - sc.max);
1831     if (sc.max > 0 || sc.got == chunk_size) {
1832 	Eterm *hp;
1833 	unsigned sz;
1834 
1835 	if (sc.got < chunk_size ||
1836 	    sc.lastobj == NULL) {
1837 	    /* We haven't got all and we haven't trapped
1838 	       which should mean we are at the end of the
1839 	       table, sc.lastobj may be NULL if the table was empty */
1840 
1841 	    if (!sc.got) {
1842 		RET_TO_BIF(am_EOT, DB_ERROR_NONE);
1843 	    } else {
1844 		RET_TO_BIF(bif_trap3(&ets_select_reverse_exp, p,
1845 				     sc.accum, NIL, am_EOT),
1846 			   DB_ERROR_NONE);
1847 	    }
1848 	}
1849 
1850 	key = GETKEY(tb, sc.lastobj);
1851 	sz = size_object(key);
1852 	hp = HAlloc(p, 9 + sz + ERTS_MAGIC_REF_THING_SIZE);
1853 	key = copy_struct(key, sz, &hp, &MSO(p));
1854 	mpb = erts_db_make_match_prog_ref(p,mpi.mp,&hp);
1855 
1856 	continuation = TUPLE8
1857 	    (hp,
1858 	     tid,
1859 	     key,
1860 	     sc.end_condition, /* From the match program,
1861 				  needn't be copied */
1862 	     make_small(chunk_size),
1863 	     mpb,
1864 	     NIL,
1865 	     make_small(reverse),
1866 	     make_small(0));
1867 	/* Don't let RET_TO_BIF macro free mpi.mp*/
1868 	*ret = bif_trap3(&ets_select_reverse_exp, p,
1869 			 sc.accum, NIL, continuation);
1870 	return DB_ERROR_NONE;
1871     }
1872 
1873     key = GETKEY(tb, sc.lastobj);
1874     sz = size_object(key);
1875     hp = HAlloc(p, 9 + sz + ERTS_MAGIC_REF_THING_SIZE);
1876     key = copy_struct(key, sz, &hp, &MSO(p));
1877 
1878     mpb = erts_db_make_match_prog_ref(p,mpi.mp,&hp);
1879     continuation = TUPLE8
1880 	(hp,
1881 	 tid,
1882 	 key,
1883 	 sc.end_condition, /* From the match program, needn't be copied */
1884 	 make_small(chunk_size),
1885 	 mpb,
1886 	 sc.accum,
1887 	 make_small(reverse),
1888 	 make_small(sc.got));
1889     /* Don't let RET_TO_BIF macro free mpi.mp*/
1890     *ret = bif_trap1(BIF_TRAP_EXPORT(BIF_ets_select_1), p, continuation);
1891     return DB_ERROR_NONE;
1892 
1893 #undef RET_TO_BIF
1894 
1895 }
1896 
db_select_chunk_tree(Process * p,DbTable * tbl,Eterm tid,Eterm pattern,Sint chunk_size,int reverse,Eterm * ret,enum DbIterSafety safety)1897 static int db_select_chunk_tree(Process *p, DbTable *tbl, Eterm tid,
1898                                 Eterm pattern, Sint chunk_size,
1899                                 int reverse,
1900                                 Eterm *ret, enum DbIterSafety safety)
1901 {
1902     DbTableTree *tb = &tbl->tree;
1903     return db_select_chunk_tree_common(p, tbl,
1904                                        tid, pattern, chunk_size,
1905                                        reverse, ret, tb, NULL);
1906 }
1907 
1908 
db_select_delete_continue_tree_common(Process * p,DbTable * tbl,Eterm continuation,Eterm * ret,DbTreeStack * stack,CATreeRootIterator * iter)1909 int db_select_delete_continue_tree_common(Process *p,
1910                                           DbTable *tbl,
1911                                           Eterm continuation,
1912                                           Eterm *ret,
1913                                           DbTreeStack* stack,
1914                                           CATreeRootIterator* iter)
1915 {
1916     struct select_delete_context sc;
1917     unsigned sz;
1918     Eterm *hp;
1919     Eterm lastkey;
1920     Eterm end_condition;
1921     Binary *mp;
1922     Eterm key;
1923     Eterm *tptr;
1924     Eterm eaccsum;
1925 
1926 #define RET_TO_BIF(Term, State) do { 		\
1927 	if (sc.erase_lastterm) {		\
1928 	    free_term(tbl, sc.lastterm);		\
1929 	}					\
1930 	*ret = (Term); 				\
1931 	return State; 				\
1932     } while(0);
1933 
1934     /* Decode continuation. We know it's correct, this can only be called
1935        by trapping */
1936 
1937     tptr = tuple_val(continuation);
1938 
1939     lastkey = tptr[2];
1940     end_condition = tptr[3];
1941 
1942     sc.erase_lastterm = 0; /* Before first RET_TO_BIF */
1943     sc.lastterm = NULL;
1944 
1945     mp = erts_db_get_match_prog_binary_unchecked(tptr[4]);
1946     sc.p = p;
1947     sc.tb = &tbl->common;
1948     sc.stack = stack;
1949     if (is_big(tptr[5])) {
1950 	sc.accum = big_to_uint32(tptr[5]);
1951     } else {
1952 	sc.accum = unsigned_val(tptr[5]);
1953     }
1954     sc.mp = mp;
1955     sc.end_condition = NIL;
1956     sc.max = 1000;
1957     sc.keypos = tbl->common.keypos;
1958 
1959     if (iter) {
1960         iter->next_route_key = lastkey;
1961         sc.common.root = catree_find_prev_root(iter, NULL);
1962     }
1963     else {
1964         sc.common.root = &tbl->tree.root;
1965     }
1966 
1967     if (sc.common.root) {
1968         traverse_backwards(&tbl->common, stack, lastkey, &doit_select_delete, &sc.common, iter);
1969 
1970         BUMP_REDS(p, 1000 - sc.max);
1971     }
1972 
1973     if (sc.max > 0) {
1974 	RET_TO_BIF(erts_make_integer(sc.accum, p), DB_ERROR_NONE);
1975     }
1976     key = GETKEY(&tbl->common, (sc.lastterm)->dbterm.tpl);
1977     if (end_condition != NIL &&
1978 	cmp_partly_bound(end_condition,key) > 0) { /* done anyway */
1979 	RET_TO_BIF(erts_make_integer(sc.accum,p),DB_ERROR_NONE);
1980     }
1981     /* Not done yet, let's trap. */
1982     sz = size_object(key);
1983     if (IS_USMALL(0, sc.accum)) {
1984 	hp = HAlloc(p, sz + 6);
1985 	eaccsum = make_small(sc.accum);
1986     }
1987     else {
1988 	hp = HAlloc(p, BIG_UINT_HEAP_SIZE + sz + 6);
1989 	eaccsum = uint_to_big(sc.accum, hp);
1990 	hp += BIG_UINT_HEAP_SIZE;
1991     }
1992     key = copy_struct(key, sz, &hp, &MSO(p));
1993     continuation = TUPLE5
1994 	(hp,
1995 	 tptr[1],
1996 	 key,
1997 	 tptr[3],
1998 	 tptr[4],
1999 	 eaccsum);
2000     RET_TO_BIF(bif_trap1(&ets_select_delete_continue_exp, p, continuation),
2001 	       DB_ERROR_NONE);
2002 
2003 #undef RET_TO_BIF
2004 }
2005 
db_select_delete_continue_tree(Process * p,DbTable * tbl,Eterm continuation,Eterm * ret,enum DbIterSafety * safety_p)2006 static int db_select_delete_continue_tree(Process *p,
2007 					  DbTable *tbl,
2008 					  Eterm continuation,
2009 					  Eterm *ret,
2010                                           enum DbIterSafety* safety_p)
2011 {
2012     DbTableTree *tb = &tbl->tree;
2013     ASSERT(!erts_atomic_read_nob(&tb->is_stack_busy));
2014     return db_select_delete_continue_tree_common(p, tbl, continuation, ret,
2015                                                  &tb->static_stack, NULL);
2016 }
2017 
db_select_delete_tree_common(Process * p,DbTable * tbl,Eterm tid,Eterm pattern,Eterm * ret,DbTreeStack * stack,CATreeRootIterator * iter)2018 int db_select_delete_tree_common(Process *p, DbTable *tbl,
2019                                  Eterm tid, Eterm pattern,
2020                                  Eterm *ret,
2021                                  DbTreeStack* stack,
2022                                  CATreeRootIterator* iter)
2023 {
2024     struct select_delete_context sc;
2025     struct mp_info mpi;
2026     Eterm lastkey = THE_NON_VALUE;
2027     Eterm key;
2028     Eterm continuation;
2029     unsigned sz;
2030     Eterm *hp;
2031     TreeDbTerm *this;
2032     int errcode;
2033     Eterm mpb;
2034     Eterm eaccsum;
2035 
2036 #define RET_TO_BIF(Term,RetVal) do { 	       	\
2037 	if (mpi.mp != NULL) {			\
2038 	    erts_bin_free(mpi.mp);       	\
2039 	}					\
2040 	if (sc.erase_lastterm) {                \
2041 	    free_term(tbl, sc.lastterm);         \
2042 	}                                       \
2043 	*ret = (Term); 				\
2044 	return RetVal; 			        \
2045     } while(0)
2046 
2047     mpi.mp = NULL;
2048 
2049     sc.accum = 0;
2050     sc.erase_lastterm = 0;
2051     sc.lastterm = NULL;
2052     sc.p = p;
2053     sc.max = 1000;
2054     sc.end_condition = NIL;
2055     sc.keypos = tbl->common.keypos;
2056     sc.tb = &tbl->common;
2057     sc.stack = stack;
2058 
2059     if ((errcode = analyze_pattern(&tbl->common, pattern, NULL, &mpi)) != DB_ERROR_NONE) {
2060 	RET_TO_BIF(0,errcode);
2061     }
2062 
2063     if (mpi.key_boundness == MS_KEY_IMPOSSIBLE) {
2064 	RET_TO_BIF(make_small(0),DB_ERROR_NONE);
2065 	/* can't possibly match anything */
2066     }
2067 
2068     sc.mp = mpi.mp;
2069 
2070     if (mpi.key_boundness == MS_KEY_BOUND) {
2071         ASSERT(CMP_EQ(mpi.least, mpi.most));
2072         if (iter)
2073             sc.common.root = catree_find_root(mpi.least, iter);
2074         else
2075             sc.common.root = &tbl->tree.root;
2076         this =  find_node(&tbl->common, *sc.common.root, mpi.least, NULL);
2077         if (this)
2078             doit_select_delete(&tbl->common, this, &sc.common, 0 /* direction doesn't
2079 						      matter */);
2080 	RET_TO_BIF(erts_make_integer(sc.accum,p),DB_ERROR_NONE);
2081     }
2082 
2083     if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) {
2084         this = find_next_from_pb_key(tbl, &sc.common.root, stack, mpi.most, iter);
2085         if (this)
2086             lastkey = GETKEY(&tbl->common, this->dbterm.tpl);
2087 	sc.end_condition = mpi.least;
2088     }
2089     else {
2090         ASSERT(mpi.key_boundness == MS_KEY_UNBOUND);
2091         if (iter)
2092             sc.common.root = catree_find_last_root(iter);
2093         else
2094             sc.common.root = &tbl->tree.root;
2095     }
2096 
2097     traverse_backwards(&tbl->common, stack, lastkey,
2098                        &doit_select_delete, &sc.common, iter);
2099     BUMP_REDS(p, 1000 - sc.max);
2100 
2101     if (sc.max > 0) {
2102 	RET_TO_BIF(erts_make_integer(sc.accum,p), DB_ERROR_NONE);
2103     }
2104 
2105     key = GETKEY(&tbl->common, (sc.lastterm)->dbterm.tpl);
2106     sz = size_object(key);
2107     if (IS_USMALL(0, sc.accum)) {
2108 	hp = HAlloc(p, sz + ERTS_MAGIC_REF_THING_SIZE + 6);
2109 	eaccsum = make_small(sc.accum);
2110     }
2111     else {
2112 	hp = HAlloc(p, BIG_UINT_HEAP_SIZE + sz + ERTS_MAGIC_REF_THING_SIZE + 6);
2113 	eaccsum = uint_to_big(sc.accum, hp);
2114 	hp += BIG_UINT_HEAP_SIZE;
2115     }
2116     key = copy_struct(key, sz, &hp, &MSO(p));
2117     mpb = erts_db_make_match_prog_ref(p,mpi.mp,&hp);
2118 
2119     continuation = TUPLE5
2120 	(hp,
2121 	 tid,
2122 	 key,
2123 	 sc.end_condition, /* From the match program, needn't be copied */
2124 	 mpb,
2125 	 eaccsum);
2126 
2127     /* Don't free mpi.mp, so don't use macro */
2128     if (sc.erase_lastterm) {
2129 	free_term(tbl, sc.lastterm);
2130     }
2131     *ret = bif_trap1(&ets_select_delete_continue_exp, p, continuation);
2132     return DB_ERROR_NONE;
2133 
2134 #undef RET_TO_BIF
2135 
2136 }
2137 
db_select_delete_tree(Process * p,DbTable * tbl,Eterm tid,Eterm pattern,Eterm * ret,enum DbIterSafety safety)2138 static int db_select_delete_tree(Process *p, DbTable *tbl, Eterm tid,
2139 				 Eterm pattern, Eterm *ret,
2140                                  enum DbIterSafety safety)
2141 {
2142     DbTableTree *tb = &tbl->tree;
2143     return db_select_delete_tree_common(p, tbl, tid, pattern, ret,
2144                                         &tb->static_stack, NULL);
2145 }
2146 
db_select_replace_continue_tree_common(Process * p,DbTable * tbl,Eterm continuation,Eterm * ret,DbTableTree * stack_container,CATreeRootIterator * iter)2147 int db_select_replace_continue_tree_common(Process *p,
2148                                            DbTable *tbl,
2149                                            Eterm continuation,
2150                                            Eterm *ret,
2151                                            DbTableTree *stack_container,
2152                                            CATreeRootIterator* iter)
2153 {
2154     DbTreeStack* stack;
2155     struct select_replace_context sc;
2156     unsigned sz;
2157     Eterm *hp;
2158     Eterm lastkey;
2159     Eterm end_condition;
2160     Binary *mp;
2161     Eterm key;
2162     Eterm *tptr;
2163     Eterm ereplaced;
2164     Sint prev_replaced;
2165 
2166 
2167 #define RET_TO_BIF(Term, State) do { *ret = (Term); return State; } while(0);
2168 
2169     /* Decode continuation. We know it's a tuple and everything else as
2170        this is only called by ourselves */
2171 
2172     /* continuation:
2173        {Table, Lastkey, EndCondition, MatchProgBin, HowManyReplaced}*/
2174 
2175     tptr = tuple_val(continuation);
2176 
2177     if (arityval(*tptr) != 5)
2178         erts_exit(ERTS_ERROR_EXIT,"Internal error in ets:select_replace/1");
2179 
2180     lastkey = tptr[2];
2181     end_condition = tptr[3];
2182     mp = erts_db_get_match_prog_binary_unchecked(tptr[4]);
2183 
2184     sc.p = p;
2185     sc.mp = mp;
2186     sc.end_condition = NIL;
2187     sc.lastobj = NULL;
2188     sc.max = 1000;
2189     sc.keypos = tbl->common.keypos;
2190     if (is_big(tptr[5])) {
2191         sc.replaced = big_to_uint32(tptr[5]);
2192     } else {
2193         sc.replaced = unsigned_val(tptr[5]);
2194     }
2195     prev_replaced = sc.replaced;
2196 
2197     if (iter) {
2198         iter->next_route_key = lastkey;
2199         sc.common.root = catree_find_prev_root(iter, NULL);
2200     }
2201     else {
2202         sc.common.root = &tbl->tree.root;
2203     }
2204 
2205     stack = get_any_stack(tbl, stack_container);
2206     traverse_update_backwards(&tbl->common, stack, lastkey, &doit_select_replace,
2207                               &sc.common, iter);
2208     release_stack(tbl, stack_container,stack);
2209 
2210     // the more objects we've replaced, the more reductions we've consumed
2211     BUMP_REDS(p, MIN(2000, (1000 - sc.max) + (sc.replaced - prev_replaced)));
2212 
2213     if (sc.max > 0) {
2214         RET_TO_BIF(erts_make_integer(sc.replaced,p), DB_ERROR_NONE);
2215     }
2216     key = GETKEY(tbl, sc.lastobj);
2217     if (end_condition != NIL &&
2218             (cmp_partly_bound(end_condition,key) > 0)) {
2219         /* done anyway */
2220         RET_TO_BIF(make_small(sc.replaced),DB_ERROR_NONE);
2221     }
2222     /* Not done yet, let's trap. */
2223     sz = size_object(key);
2224     if (IS_USMALL(0, sc.replaced)) {
2225         hp = HAlloc(p, sz + 6);
2226         ereplaced = make_small(sc.replaced);
2227     }
2228     else {
2229         hp = HAlloc(p, BIG_UINT_HEAP_SIZE + sz + 6);
2230         ereplaced = uint_to_big(sc.replaced, hp);
2231         hp += BIG_UINT_HEAP_SIZE;
2232     }
2233     key = copy_struct(key, sz, &hp, &MSO(p));
2234     continuation = TUPLE5
2235         (hp,
2236          tptr[1],
2237          key,
2238          tptr[3],
2239          tptr[4],
2240          ereplaced);
2241     RET_TO_BIF(bif_trap1(&ets_select_replace_continue_exp, p, continuation),
2242             DB_ERROR_NONE);
2243 
2244 #undef RET_TO_BIF
2245 }
2246 
db_select_replace_continue_tree(Process * p,DbTable * tbl,Eterm continuation,Eterm * ret,enum DbIterSafety * safety_p)2247 static int db_select_replace_continue_tree(Process *p,
2248                                            DbTable *tbl,
2249                                            Eterm continuation,
2250                                            Eterm *ret,
2251                                            enum DbIterSafety* safety_p)
2252 {
2253     return db_select_replace_continue_tree_common(p, tbl, continuation, ret,
2254                                                   &tbl->tree, NULL);
2255 }
2256 
db_select_replace_tree_common(Process * p,DbTable * tbl,Eterm tid,Eterm pattern,Eterm * ret,DbTableTree * stack_container,CATreeRootIterator * iter)2257 int db_select_replace_tree_common(Process *p, DbTable *tbl,
2258                                   Eterm tid, Eterm pattern, Eterm *ret,
2259                                   DbTableTree *stack_container,
2260                                   CATreeRootIterator* iter)
2261 {
2262     DbTreeStack* stack;
2263     struct select_replace_context sc;
2264     struct mp_info mpi;
2265     Eterm lastkey = THE_NON_VALUE;
2266     Eterm key;
2267     Eterm continuation;
2268     unsigned sz;
2269     Eterm *hp;
2270     TreeDbTerm *this;
2271     int errcode;
2272     Eterm ereplaced;
2273     Eterm mpb;
2274 
2275 
2276 #define RET_TO_BIF(Term,RetVal) do { 	       	\
2277 	if (mpi.mp != NULL) {			\
2278 	    erts_bin_free(mpi.mp);       	\
2279 	}					\
2280 	*ret = (Term); 				\
2281 	return RetVal; 			        \
2282     } while(0)
2283 
2284     mpi.mp = NULL;
2285 
2286     sc.lastobj = NULL;
2287     sc.p = p;
2288     sc.tb = &tbl->common;
2289     sc.max = 1000;
2290     sc.end_condition = NIL;
2291     sc.keypos = tbl->common.keypos;
2292     sc.replaced = 0;
2293 
2294     if ((errcode = analyze_pattern(&tbl->common, pattern, db_match_keeps_key, &mpi)) != DB_ERROR_NONE) {
2295         RET_TO_BIF(NIL,errcode);
2296     }
2297 
2298     if (mpi.key_boundness == MS_KEY_IMPOSSIBLE) {
2299         RET_TO_BIF(make_small(0),DB_ERROR_NONE);
2300         /* can't possibly match anything */
2301     }
2302 
2303     sc.mp = mpi.mp;
2304 
2305     if (mpi.key_boundness == MS_KEY_BOUND) {
2306         TreeDbTerm** pp;
2307         ASSERT(CMP_EQ(mpi.least, mpi.most));
2308         if (iter)
2309             sc.common.root = catree_find_root(mpi.least, iter);
2310         else
2311             sc.common.root = &tbl->tree.root;
2312         pp = find_node2(&tbl->common, sc.common.root, mpi.least);
2313         if (pp) {
2314             doit_select_replace(&tbl->common, pp, &sc.common, 0 /* dummy */);
2315             reset_static_stack(stack_container); /* may refer replaced term */
2316         }
2317         RET_TO_BIF(erts_make_integer(sc.replaced,p),DB_ERROR_NONE);
2318     }
2319 
2320     stack = get_any_stack(tbl,stack_container);
2321 
2322     if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) {
2323         this = find_next_from_pb_key(tbl, &sc.common.root, stack, mpi.most, iter);
2324         if (this)
2325             lastkey = GETKEY(tbl, this->dbterm.tpl);
2326         sc.end_condition = mpi.least;
2327     }
2328     else {
2329         ASSERT(mpi.key_boundness == MS_KEY_UNBOUND);
2330         if (iter)
2331             sc.common.root = catree_find_last_root(iter);
2332         else
2333             sc.common.root = &tbl->tree.root;
2334     }
2335 
2336     traverse_update_backwards(&tbl->common, stack, lastkey, &doit_select_replace,
2337                               &sc.common, iter);
2338     release_stack(tbl,stack_container,stack);
2339     // the more objects we've replaced, the more reductions we've consumed
2340     BUMP_REDS(p, MIN(2000, (1000 - sc.max) + sc.replaced));
2341     if (sc.max > 0) {
2342         RET_TO_BIF(erts_make_integer(sc.replaced,p),DB_ERROR_NONE);
2343     }
2344 
2345     key = GETKEY(tbl, sc.lastobj);
2346     sz = size_object(key);
2347     if (IS_USMALL(0, sc.replaced)) {
2348         hp = HAlloc(p, sz + ERTS_MAGIC_REF_THING_SIZE + 6);
2349         ereplaced = make_small(sc.replaced);
2350     }
2351     else {
2352         hp = HAlloc(p, BIG_UINT_HEAP_SIZE + sz + ERTS_MAGIC_REF_THING_SIZE + 6);
2353         ereplaced = uint_to_big(sc.replaced, hp);
2354         hp += BIG_UINT_HEAP_SIZE;
2355     }
2356     key = copy_struct(key, sz, &hp, &MSO(p));
2357     mpb = erts_db_make_match_prog_ref(p,mpi.mp,&hp);
2358 
2359     continuation = TUPLE5
2360         (hp,
2361          tid,
2362          key,
2363          sc.end_condition, /* From the match program, needn't be copied */
2364          mpb,
2365          ereplaced);
2366 
2367     /* Don't free mpi.mp, so don't use macro */
2368     *ret = bif_trap1(&ets_select_replace_continue_exp, p, continuation);
2369     return DB_ERROR_NONE;
2370 
2371 #undef RET_TO_BIF
2372 
2373 }
2374 
db_select_replace_tree(Process * p,DbTable * tbl,Eterm tid,Eterm pattern,Eterm * ret,enum DbIterSafety safety)2375 static int db_select_replace_tree(Process *p, DbTable *tbl, Eterm tid,
2376                                   Eterm pattern, Eterm *ret,
2377                                   enum DbIterSafety safety)
2378 {
2379     return db_select_replace_tree_common(p, tbl, tid, pattern, ret,
2380                                          &tbl->tree, NULL);
2381 }
2382 
db_take_tree_common(Process * p,DbTable * tbl,TreeDbTerm ** root,Eterm key,Eterm * ret,DbTreeStack * stack)2383 int db_take_tree_common(Process *p, DbTable *tbl, TreeDbTerm **root,
2384                         Eterm key, Eterm *ret,
2385                         DbTreeStack *stack /* NULL if no static stack */)
2386 {
2387     TreeDbTerm *this;
2388 
2389     *ret = NIL;
2390     this = linkout_tree(&tbl->common, root, key, stack);
2391     if (this) {
2392         Eterm copy, *hp, *hend;
2393 
2394         hp = HAlloc(p, this->dbterm.size + 2);
2395         hend = hp + this->dbterm.size + 2;
2396         copy = db_copy_object_from_ets(&tbl->common,
2397                                        &this->dbterm, &hp, &MSO(p));
2398         *ret = CONS(hp, copy, NIL);
2399         hp += 2;
2400         HRelease(p, hend, hp);
2401         free_term(tbl, this);
2402     }
2403     return DB_ERROR_NONE;
2404 }
2405 
db_take_tree(Process * p,DbTable * tbl,Eterm key,Eterm * ret)2406 static int db_take_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
2407 {
2408     DbTableTree *tb = &tbl->tree;
2409     return db_take_tree_common(p, tbl, &tb->root,
2410                                key, ret, &tb->static_stack);
2411 }
2412 
2413 /*
2414 ** Other interface routines (not directly coupled to one bif)
2415 */
2416 
db_print_tree_common(fmtfn_t to,void * to_arg,int show,TreeDbTerm * root,DbTable * tbl)2417 void db_print_tree_common(fmtfn_t to, void *to_arg,
2418                           int show, TreeDbTerm *root, DbTable *tbl)
2419 {
2420 #ifdef TREE_DEBUG
2421     if (show)
2422 	erts_print(to, to_arg, "\nTree data dump:\n"
2423 		   "------------------------------------------------\n");
2424     do_dump_tree2(&tbl->common, to, to_arg, show, root, 0);
2425     if (show)
2426 	erts_print(to, to_arg, "\n"
2427 		   "------------------------------------------------\n");
2428 #else
2429     erts_print(to, to_arg, "Ordered set (AVL tree), Elements: %d\n",
2430                erts_flxctr_read_approx(&tbl->common.counters, ERTS_DB_TABLE_NITEMS_COUNTER_ID));
2431 #endif
2432 }
2433 
2434 /* Display tree contents (for dump) */
db_print_tree(fmtfn_t to,void * to_arg,int show,DbTable * tbl)2435 static void db_print_tree(fmtfn_t to, void *to_arg,
2436 			  int show,
2437 			  DbTable *tbl)
2438 {
2439     DbTableTree *tb = &tbl->tree;
2440     db_print_tree_common(to, to_arg, show, tb->root, tbl);
2441 }
2442 
2443 /* release all memory occupied by a single table */
db_free_empty_table_tree(DbTable * tbl)2444 static int db_free_empty_table_tree(DbTable *tbl)
2445 {
2446     ASSERT(tbl->tree.root == NULL);
2447     while (db_free_table_continue_tree(tbl, ERTS_SWORD_MAX) < 0)
2448 	;
2449     return 1;
2450 }
2451 
db_free_table_continue_tree(DbTable * tbl,SWord reds)2452 static SWord db_free_table_continue_tree(DbTable *tbl, SWord reds)
2453 {
2454     DbTableTree *tb = &tbl->tree;
2455 
2456     if (!tb->deletion) {
2457 	tb->static_stack.pos = 0;
2458 	tb->deletion = 1;
2459 	PUSH_NODE(&tb->static_stack, tb->root);
2460     }
2461     reds = do_free_tree_continue(tb, reds);
2462     if (reds >= 0) {		/* Completely done. */
2463 	erts_db_free(ERTS_ALC_T_DB_STK,
2464 		     (DbTable *) tb,
2465 		     (void *) tb->static_stack.array,
2466 		     sizeof(TreeDbTerm *) * STACK_NEED);
2467 	ASSERT(erts_flxctr_is_snapshot_ongoing(&tb->common.counters) ||
2468                ((APPROX_MEM_CONSUMED(tb)
2469                  == (sizeof(DbTable) +
2470                      erts_flxctr_nr_of_allocated_bytes(&tb->common.counters))) ||
2471                 (APPROX_MEM_CONSUMED(tb)
2472                  == (sizeof(DbTable) +
2473                      sizeof(DbFixation) +
2474                      erts_flxctr_nr_of_allocated_bytes(&tb->common.counters)))));
2475     }
2476     return reds;
2477 }
2478 
db_delete_all_objects_tree(Process * p,DbTable * tbl,SWord reds,Eterm * nitems_holder_wb)2479 static SWord db_delete_all_objects_tree(Process* p,
2480                                         DbTable* tbl,
2481                                         SWord reds,
2482                                         Eterm* nitems_holder_wb)
2483 {
2484     if (nitems_holder_wb != NULL) {
2485         Uint nr_of_items =
2486             erts_flxctr_read_centralized(&tbl->common.counters,
2487                                          ERTS_DB_TABLE_NITEMS_COUNTER_ID);
2488         *nitems_holder_wb = erts_make_integer(nr_of_items, p);
2489     }
2490     reds = db_free_table_continue_tree(tbl, reds);
2491     if (reds < 0)
2492         return reds;
2493     db_create_tree(p, tbl);
2494     RESET_NITEMS(tbl);
2495     return reds;
2496 }
2497 
db_delete_all_objects_get_nitems_from_holder_tree(Process * p,Eterm holder)2498 static Eterm db_delete_all_objects_get_nitems_from_holder_tree(Process* p,
2499                                                                Eterm holder)
2500 {
2501     (void)p;
2502     return holder;
2503 }
2504 
2505 static void do_db_tree_foreach_offheap(TreeDbTerm *,
2506 				       void (*)(ErlOffHeap *, void *),
2507 				       void *);
2508 
db_foreach_offheap_tree_common(TreeDbTerm * root,void (* func)(ErlOffHeap *,void *),void * arg)2509 void db_foreach_offheap_tree_common(TreeDbTerm *root,
2510                                     void (*func)(ErlOffHeap *, void *),
2511                                     void * arg)
2512 {
2513     do_db_tree_foreach_offheap(root, func, arg);
2514 }
2515 
db_foreach_offheap_tree(DbTable * tbl,void (* func)(ErlOffHeap *,void *),void * arg)2516 static void db_foreach_offheap_tree(DbTable *tbl,
2517 				    void (*func)(ErlOffHeap *, void *),
2518 				    void * arg)
2519 {
2520     db_foreach_offheap_tree_common(tbl->tree.root, func, arg);
2521 }
2522 
2523 
2524 /*
2525 ** Functions for internal use
2526 */
2527 
2528 
2529 static void
do_db_tree_foreach_offheap(TreeDbTerm * tdbt,void (* func)(ErlOffHeap *,void *),void * arg)2530 do_db_tree_foreach_offheap(TreeDbTerm *tdbt,
2531 			   void (*func)(ErlOffHeap *, void *),
2532 			   void * arg)
2533 {
2534     ErlOffHeap tmp_offheap;
2535     if(!tdbt)
2536 	return;
2537     do_db_tree_foreach_offheap(tdbt->left, func, arg);
2538     tmp_offheap.first = tdbt->dbterm.first_oh;
2539     tmp_offheap.overhead = 0;
2540     (*func)(&tmp_offheap, arg);
2541     tdbt->dbterm.first_oh = tmp_offheap.first;
2542     do_db_tree_foreach_offheap(tdbt->right, func, arg);
2543 }
2544 
linkout_tree(DbTableCommon * tb,TreeDbTerm ** root,Eterm key,DbTreeStack * stack)2545 static TreeDbTerm *linkout_tree(DbTableCommon *tb, TreeDbTerm **root,
2546                                 Eterm key, DbTreeStack *stack) {
2547     TreeDbTerm **tstack[STACK_NEED];
2548     int tpos = 0;
2549     int dstack[STACK_NEED+1];
2550     int dpos = 0;
2551     int state = 0;
2552     TreeDbTerm **this = root;
2553     Sint c;
2554     int dir;
2555     TreeDbTerm *q = NULL;
2556 
2557     /*
2558      * Somewhat complicated, deletion in an AVL tree,
2559      * The two helpers balance_left and balance_right are used to
2560      * keep the balance. As in insert, we do the stacking ourselves.
2561      */
2562 
2563     reset_stack(stack);
2564     dstack[dpos++] = DIR_END;
2565     for (;;) {
2566 	if (!*this) { /* Failure */
2567 	    return NULL;
2568 	} else if ((c = cmp_key(tb, key, *this)) < 0) {
2569 	    dstack[dpos++] = DIR_LEFT;
2570 	    tstack[tpos++] = this;
2571 	    this = &((*this)->left);
2572 	} else if (c > 0) { /* go right */
2573 	    dstack[dpos++] = DIR_RIGHT;
2574 	    tstack[tpos++] = this;
2575 	    this = &((*this)->right);
2576 	} else { /* Equal key, found the one to delete*/
2577 	    q = (*this);
2578 	    if (q->right == NULL) {
2579 		(*this) = q->left;
2580 		state = 1;
2581 	    } else if (q->left == NULL) {
2582 		(*this) = q->right;
2583 		state = 1;
2584 	    } else {
2585 		dstack[dpos++] = DIR_LEFT;
2586 		tstack[tpos++] = this;
2587 		state = delsub(this);
2588 	    }
2589             DEC_NITEMS(((DbTable*)tb));
2590 	    break;
2591 	}
2592     }
2593     while (state && ( dir = dstack[--dpos] ) != DIR_END) {
2594 	this = tstack[--tpos];
2595 	if (dir == DIR_LEFT) {
2596 	    state = tree_balance_left(this);
2597 	} else {
2598 	    state = tree_balance_right(this);
2599 	}
2600     }
2601     return q;
2602 }
2603 
linkout_object_tree(DbTableCommon * tb,TreeDbTerm ** root,Eterm object,DbTableTree * stack)2604 static TreeDbTerm *linkout_object_tree(DbTableCommon *tb,  TreeDbTerm **root,
2605 				       Eterm object, DbTableTree *stack)
2606 {
2607     TreeDbTerm **tstack[STACK_NEED];
2608     int tpos = 0;
2609     int dstack[STACK_NEED+1];
2610     int dpos = 0;
2611     int state = 0;
2612     TreeDbTerm **this = root;
2613     Sint c;
2614     int dir;
2615     TreeDbTerm *q = NULL;
2616     Eterm key;
2617 
2618     /*
2619      * Somewhat complicated, deletion in an AVL tree,
2620      * The two helpers balance_left and balance_right are used to
2621      * keep the balance. As in insert, we do the stacking ourselves.
2622      */
2623 
2624 
2625     key = GETKEY(tb, tuple_val(object));
2626 
2627     reset_static_stack(stack);
2628     dstack[dpos++] = DIR_END;
2629     for (;;) {
2630 	if (!*this) { /* Failure */
2631 	    return NULL;
2632 	} else if ((c = cmp_key(tb,key,*this)) < 0) {
2633 	    dstack[dpos++] = DIR_LEFT;
2634 	    tstack[tpos++] = this;
2635 	    this = &((*this)->left);
2636 	} else if (c > 0) { /* go right */
2637 	    dstack[dpos++] = DIR_RIGHT;
2638 	    tstack[tpos++] = this;
2639 	    this = &((*this)->right);
2640 	} else { /* Equal key, found the only possible matching object*/
2641 	    if (!db_eq(tb,object,&(*this)->dbterm)) {
2642 		return NULL;
2643 	    }
2644 	    q = (*this);
2645 	    if (q->right == NULL) {
2646 		(*this) = q->left;
2647 		state = 1;
2648 	    } else if (q->left == NULL) {
2649 		(*this) = q->right;
2650 		state = 1;
2651 	    } else {
2652 		dstack[dpos++] = DIR_LEFT;
2653 		tstack[tpos++] = this;
2654 		state = delsub(this);
2655 	    }
2656             DEC_NITEMS(((DbTable*)tb));
2657 	    break;
2658 	}
2659     }
2660     while (state && ( dir = dstack[--dpos] ) != DIR_END) {
2661 	this = tstack[--tpos];
2662 	if (dir == DIR_LEFT) {
2663 	    state = tree_balance_left(this);
2664 	} else {
2665 	    state = tree_balance_right(this);
2666 	}
2667     }
2668     return q;
2669 }
2670 
2671 /*
2672 ** For the select functions, analyzes the pattern and determines which
2673 ** part of the tree should be searched. Also compiles the match program
2674 */
analyze_pattern(DbTableCommon * tb,Eterm pattern,extra_match_validator_t extra_validator,struct mp_info * mpi)2675 static int analyze_pattern(DbTableCommon *tb, Eterm pattern,
2676                            extra_match_validator_t extra_validator, /* Optional callback */
2677                            struct mp_info *mpi)
2678 {
2679     Eterm lst, tpl, ttpl;
2680     Eterm *matches,*guards, *bodies;
2681     Eterm sbuff[30];
2682     Eterm *buff = sbuff;
2683     Eterm *ptpl;
2684     int i;
2685     int num_heads = 0;
2686     Eterm least = THE_NON_VALUE;
2687     Eterm most = THE_NON_VALUE;
2688     enum ms_key_boundness boundness;
2689     Uint freason;
2690 
2691     mpi->key_boundness = MS_KEY_IMPOSSIBLE;
2692     mpi->mp = NULL;
2693 
2694     for (lst = pattern; is_list(lst); lst = CDR(list_val(lst)))
2695 	++num_heads;
2696 
2697     if (lst != NIL) {/* proper list... */
2698 	return DB_ERROR_BADPARAM;
2699     }
2700     if (num_heads > 10) {
2701 	buff = erts_alloc(ERTS_ALC_T_DB_TMP, sizeof(Eterm) * num_heads * 3);
2702     }
2703 
2704     matches = buff;
2705     guards = buff + num_heads;
2706     bodies = buff + (num_heads * 2);
2707 
2708     i = 0;
2709     for(lst = pattern; is_list(lst); lst = CDR(list_val(lst))) {
2710         Eterm match;
2711         Eterm guard;
2712         Eterm body;
2713         Eterm key;
2714 
2715 	ttpl = CAR(list_val(lst));
2716 	if (!is_tuple(ttpl)) {
2717 	    if (buff != sbuff) {
2718 		erts_free(ERTS_ALC_T_DB_TMP, buff);
2719 	    }
2720 	    return DB_ERROR_BADPARAM;
2721 	}
2722 	ptpl = tuple_val(ttpl);
2723 	if (ptpl[0] != make_arityval(3U)) {
2724 	    if (buff != sbuff) {
2725 		erts_free(ERTS_ALC_T_DB_TMP, buff);
2726 	    }
2727 	    return DB_ERROR_BADPARAM;
2728 	}
2729 	matches[i] = match = tpl = ptpl[1];
2730 	guards[i] = guard = ptpl[2];
2731 	bodies[i] = body = ptpl[3];
2732 
2733         if(extra_validator != NULL && !extra_validator(tb->keypos, match, guard, body)) {
2734 	    if (buff != sbuff) {
2735 		erts_free(ERTS_ALC_T_DB_TMP, buff);
2736 	    }
2737             return DB_ERROR_BADPARAM;
2738         }
2739 
2740 	if (!is_list(body) || CDR(list_val(body)) != NIL ||
2741 	    CAR(list_val(body)) != am_DollarUnderscore) {
2742 	}
2743 	++i;
2744 
2745         boundness = key_boundness(tb, tpl, &key);
2746 	switch (boundness)
2747         {
2748         case MS_KEY_BOUND:
2749         case MS_KEY_PARTIALLY_BOUND:
2750             if (is_non_value(least) || partly_bound_can_match_lesser(key,least)) {
2751                 least = key;
2752             }
2753             if (is_non_value(most) || partly_bound_can_match_greater(key,most)) {
2754                 most = key;
2755             }
2756             break;
2757         case MS_KEY_IMPOSSIBLE:
2758         case MS_KEY_UNBOUND:
2759             break;
2760         }
2761         if (mpi->key_boundness > boundness)
2762             mpi->key_boundness = boundness;
2763     }
2764 
2765     if (mpi->key_boundness == MS_KEY_BOUND && !CMP_EQ(least, most)) {
2766         /* Several different bound keys */
2767         mpi->key_boundness = MS_KEY_PARTIALLY_BOUND;
2768     }
2769     mpi->least = least;
2770     mpi->most = most;
2771 
2772     /*
2773      * It would be nice not to compile the match_spec if nothing could match,
2774      * but then the select calls would not fail like they should on bad
2775      * match specs that happen to specify non existent keys etc.
2776      */
2777     if ((mpi->mp = db_match_compile(matches, guards, bodies,
2778 				    num_heads, DCOMP_TABLE, NULL,
2779                                     &freason))
2780 	== NULL) {
2781 	if (buff != sbuff) {
2782 	    erts_free(ERTS_ALC_T_DB_TMP, buff);
2783 	}
2784         switch (freason) {
2785         case BADARG: return DB_ERROR_BADPARAM;
2786         case SYSTEM_LIMIT: return DB_ERROR_SYSRES;
2787         default: ASSERT(0); return DB_ERROR_UNSPEC;
2788         }
2789     }
2790     if (buff != sbuff) {
2791 	erts_free(ERTS_ALC_T_DB_TMP, buff);
2792     }
2793     return DB_ERROR_NONE;
2794 }
2795 
do_free_tree_continue(DbTableTree * tb,SWord reds)2796 static SWord do_free_tree_continue(DbTableTree *tb, SWord reds)
2797 {
2798     TreeDbTerm *root;
2799     TreeDbTerm *p;
2800 
2801     for (;;) {
2802 	root = POP_NODE(&tb->static_stack);
2803 	if (root == NULL) break;
2804 	for (;;) {
2805 	    if ((p = root->left) != NULL) {
2806 		root->left = NULL;
2807 		PUSH_NODE(&tb->static_stack, root);
2808 		root = p;
2809 	    } else if ((p = root->right) != NULL) {
2810 		root->right = NULL;
2811 		PUSH_NODE(&tb->static_stack, root);
2812 		root = p;
2813 	    } else {
2814 		free_term((DbTable*)tb, root);
2815 		if (--reds < 0) {
2816                     return reds;   /* Done enough for now */
2817                 }
2818                 break;
2819 	    }
2820 	}
2821     }
2822     return reds;
2823 }
2824 
2825 /*
2826  * Deletion helpers
2827  */
tree_balance_left(TreeDbTerm ** this)2828 int tree_balance_left(TreeDbTerm **this)
2829 {
2830     TreeDbTerm *p, *p1, *p2;
2831     int b1, b2, h = 1;
2832 
2833     p = *this;
2834     switch (p->balance) {
2835     case -1:
2836 	p->balance = 0;
2837 	break;
2838     case 0:
2839 	p->balance = 1;
2840 	h = 0;
2841 	break;
2842     case 1:
2843 	p1 = p->right;
2844 	b1 = p1->balance;
2845 	if (b1 >= 0) { /* Single RR rotation */
2846 	    p->right = p1->left;
2847 	    p1->left = p;
2848 	    if (b1 == 0) {
2849 		p->balance = 1;
2850 		p1->balance = -1;
2851 		h = 0;
2852 	    } else {
2853 		p->balance = p1->balance = 0;
2854 	    }
2855 	    (*this) = p1;
2856 	} else { /* Double RL rotation */
2857 	    p2 = p1->left;
2858 	    b2 = p2->balance;
2859 	    p1->left = p2->right;
2860 	    p2->right = p1;
2861 	    p->right = p2->left;
2862 	    p2->left = p;
2863 	    p->balance = (b2 == 1) ? -1 : 0;
2864 	    p1->balance = (b2 == -1) ? 1 : 0;
2865 	    p2->balance = 0;
2866 	    (*this) = p2;
2867 	}
2868 	break;
2869     }
2870     return h;
2871 }
2872 
tree_balance_right(TreeDbTerm ** this)2873 int tree_balance_right(TreeDbTerm **this)
2874 {
2875     TreeDbTerm *p, *p1, *p2;
2876     int b1, b2, h = 1;
2877 
2878     p = *this;
2879     switch (p->balance) {
2880     case 1:
2881 	p->balance = 0;
2882 	break;
2883     case 0:
2884 	p->balance = -1;
2885 	h = 0;
2886 	break;
2887     case -1:
2888 	p1 = p->left;
2889 	b1 = p1->balance;
2890 	if (b1 <= 0) { /* Single LL rotation */
2891 	    p->left = p1->right;
2892 	    p1->right = p;
2893 	    if (b1 == 0) {
2894 		p->balance = -1;
2895 		p1->balance = 1;
2896 		h = 0;
2897 	    } else {
2898 		p->balance = p1->balance = 0;
2899 	    }
2900 	    (*this) = p1;
2901 	} else { /* Double LR rotation */
2902 	    p2 = p1->right;
2903 	    b2 = p2->balance;
2904 	    p1->right = p2->left;
2905 	    p2->left = p1;
2906 	    p->left = p2->right;
2907 	    p2->right = p;
2908 	    p->balance = (b2 == -1) ? 1 : 0;
2909 	    p1->balance = (b2 == 1) ? -1 : 0;
2910 	    p2->balance = 0;
2911 	    (*this) = p2;
2912 	}
2913     }
2914     return h;
2915 }
2916 
delsub(TreeDbTerm ** this)2917 static int delsub(TreeDbTerm **this)
2918 {
2919     TreeDbTerm **tstack[STACK_NEED];
2920     int tpos = 0;
2921     TreeDbTerm *q = (*this);
2922     TreeDbTerm **r = &(q->left);
2923     int h;
2924 
2925     /*
2926      * Walk down the tree to the right and search
2927      * for a void right child, pick that child out
2928      * and return it to be put in the deleted
2929      * object's place.
2930      */
2931 
2932     while ((*r)->right != NULL) {
2933 	tstack[tpos++] = r;
2934 	r = &((*r)->right);
2935     }
2936     *this = *r;
2937     *r = (*r)->left;
2938     (*this)->left = q->left;
2939     (*this)->right = q->right;
2940     (*this)->balance = q->balance;
2941     tstack[0] = &((*this)->left);
2942     h = 1;
2943     while (tpos && h) {
2944 	r = tstack[--tpos];
2945 	h = tree_balance_right(r);
2946     }
2947     return h;
2948 }
2949 
2950 /*
2951  * Helper for db_slot
2952  */
2953 
slot_search(Process * p,TreeDbTerm * root,Sint slot,DbTable * tb,DbTableTree * stack_container,CATreeRootIterator * iter,int * is_EOT)2954 static TreeDbTerm *slot_search(Process *p, TreeDbTerm *root,
2955                                Sint slot, DbTable *tb,
2956                                DbTableTree *stack_container,
2957                                CATreeRootIterator *iter,
2958                                int* is_EOT)
2959 {
2960     TreeDbTerm *this;
2961     TreeDbTerm *tmp;
2962     TreeDbTerm *lastobj;
2963     Eterm lastkey;
2964     TreeDbTerm **pp;
2965     DbTreeStack* stack;
2966 
2967     if (iter) {
2968         /* Find first non-empty tree */
2969         while (!root) {
2970             TreeDbTerm** pp = catree_find_next_root(iter, NULL);
2971             if (!pp)
2972                 return NULL;
2973             root = *pp;
2974         }
2975     }
2976 
2977     stack = get_any_stack(tb,stack_container);
2978     ASSERT(stack != NULL);
2979 
2980     if (slot == 1) { /* Don't search from where we are if we are
2981 			looking for the first slot */
2982 	stack->slot = 0;
2983     }
2984 
2985     if (stack->slot == 0) { /* clear stack if slot positions
2986 				are not recorded */
2987 	stack->pos = 0;
2988     }
2989     while (1) {
2990         if (EMPTY_NODE(stack)) {
2991             this = root;
2992             if (this == NULL)
2993                 goto next_root;
2994             while (this->left != NULL){
2995                 PUSH_NODE(stack, this);
2996                 this = this->left;
2997             }
2998             PUSH_NODE(stack, this);
2999             stack->slot++;
3000         }
3001         this = TOP_NODE(stack);
3002         while (stack->slot != slot) {
3003             ASSERT(this);
3004             lastobj = this;
3005             if (slot > stack->slot) {
3006                 if (this->right != NULL) {
3007                     this = this->right;
3008                     while (this->left != NULL) {
3009                         PUSH_NODE(stack, this);
3010                         this = this->left;
3011                     }
3012                     PUSH_NODE(stack, this);
3013                 } else {
3014                     for (;;) {
3015                         tmp = POP_NODE(stack);
3016                         this = TOP_NODE(stack);
3017                         if (!this)
3018                             goto next_root;
3019                         if (this->left == tmp)
3020                             break;
3021                     }
3022                 }
3023                 ++(stack->slot);
3024             } else {
3025                 if (this->left != NULL) {
3026                     this = this->left;
3027                     while (this->right != NULL) {
3028                         PUSH_NODE(stack, this);
3029                         this = this->right;
3030                     }
3031                     PUSH_NODE(stack, this);
3032                 } else {
3033                     for (;;) {
3034                         tmp = POP_NODE(stack);
3035                         this = TOP_NODE(stack);
3036                         if (!this)
3037                             goto next_root;
3038                         if (this->right == tmp)
3039                             break;
3040                     }
3041                 }
3042                 --(stack->slot);
3043             }
3044         }
3045          /* Found slot */
3046         ASSERT(this);
3047         break;
3048 
3049 next_root:
3050         if (!iter) {
3051             if (stack->slot == (slot-1)) {
3052                 *is_EOT = 1;
3053             }
3054             break; /* EOT */
3055         }
3056 
3057         ASSERT(slot > stack->slot);
3058         if (lastobj) {
3059             lastkey = GETKEY(tb, lastobj->dbterm.tpl);
3060             lastobj = NULL;
3061         }
3062         pp = catree_find_next_root(iter, &lastkey);
3063         if (!pp) {
3064             if (stack->slot == (slot-1)) {
3065                 *is_EOT = 1;
3066             }
3067             break; /* EOT */
3068         }
3069         root = *pp;
3070         stack->pos = 0;
3071         find_next(&tb->common, root, stack, lastkey);
3072     }
3073 
3074     release_stack(tb,stack_container,stack);
3075     return this;
3076 }
3077 
3078 /*
3079  * Find next and previous in sort order
3080  */
3081 
find_next(DbTableCommon * tb,TreeDbTerm * root,DbTreeStack * stack,Eterm key)3082 static TreeDbTerm *find_next(DbTableCommon *tb, TreeDbTerm *root,
3083                              DbTreeStack* stack, Eterm key) {
3084     TreeDbTerm *this;
3085     TreeDbTerm *tmp;
3086     Sint c;
3087 
3088     if(( this = TOP_NODE(stack)) != NULL) {
3089 	if (!cmp_key_eq(tb,key,this)) {
3090 	    /* Start from the beginning */
3091 	    stack->pos = stack->slot = 0;
3092 	}
3093     }
3094     if (EMPTY_NODE(stack)) { /* Have to rebuild the stack */
3095 	if (( this = root ) == NULL)
3096 	    return NULL;
3097 	for (;;) {
3098 	    PUSH_NODE(stack, this);
3099 	    if (( c = cmp_key(tb,key,this) ) > 0) {
3100 		if (this->right == NULL) /* We are at the previos
3101 					    and the element does
3102 					    not exist */
3103 		    break;
3104 		else
3105 		    this = this->right;
3106 	    } else if (c < 0) {
3107 		if (this->left == NULL) /* Done */
3108                     goto found_next;
3109 		else
3110 		    this = this->left;
3111 	    } else
3112 		break;
3113 	}
3114     }
3115     /* The next element from this... */
3116     if (this->right != NULL) {
3117 	this = this->right;
3118 	PUSH_NODE(stack,this);
3119 	while (this->left != NULL) {
3120 	    this = this->left;
3121 	    PUSH_NODE(stack, this);
3122 	}
3123     } else {
3124 	do {
3125 	    tmp = POP_NODE(stack);
3126 	    if (( this = TOP_NODE(stack)) == NULL) {
3127 		stack->slot = 0;
3128 		return NULL;
3129 	    }
3130 	} while (this->right == tmp);
3131     }
3132 
3133 found_next:
3134     if (stack->slot > 0)
3135         ++(stack->slot);
3136 
3137     return this;
3138 }
3139 
find_prev(DbTableCommon * tb,TreeDbTerm * root,DbTreeStack * stack,Eterm key)3140 static TreeDbTerm *find_prev(DbTableCommon *tb, TreeDbTerm *root,
3141                              DbTreeStack* stack, Eterm key) {
3142     TreeDbTerm *this;
3143     TreeDbTerm *tmp;
3144     Sint c;
3145 
3146     if(( this = TOP_NODE(stack)) != NULL) {
3147 	if (!cmp_key_eq(tb,key,this)) {
3148 	    /* Start from the beginning */
3149 	    stack->pos = stack->slot = 0;
3150 	}
3151     }
3152     if (EMPTY_NODE(stack)) { /* Have to rebuild the stack */
3153 	if (( this = root ) == NULL)
3154 	    return NULL;
3155 	for (;;) {
3156 	    PUSH_NODE(stack, this);
3157 	    if (( c = cmp_key(tb,key,this) ) < 0) {
3158 		if (this->left == NULL) /* We are at the next
3159 					   and the element does
3160 					   not exist */
3161 		    break;
3162 		else
3163 		    this = this->left;
3164 	    } else if (c > 0) {
3165 		if (this->right == NULL) /* Done */
3166                     goto found_prev;
3167 		else
3168 		    this = this->right;
3169 	    } else
3170 		break;
3171 	}
3172     }
3173     /* The previous element from this... */
3174     if (this->left != NULL) {
3175 	this = this->left;
3176 	PUSH_NODE(stack,this);
3177 	while (this->right != NULL) {
3178 	    this = this->right;
3179 	    PUSH_NODE(stack, this);
3180 	}
3181     } else {
3182 	do {
3183 	    tmp = POP_NODE(stack);
3184 	    if (( this = TOP_NODE(stack)) == NULL) {
3185 		stack->slot = 0;
3186 		return NULL;
3187 	    }
3188 	} while (this->left == tmp);
3189     }
3190 
3191 found_prev:
3192     if (stack->slot > 0)
3193         --(stack->slot);
3194 
3195     return this;
3196 }
3197 
3198 
3199 /** @brief Find object with smallest key of all larger than partially bound
3200  * key. Can be used as a starting point for a reverse iteration with pb_key.
3201  *
3202  * @param pb_key The partially bound key. Example {42, '$1'}
3203  * @param *rootpp Will return pointer to root pointer of tree with found object.
3204  * @param iter Root iterator or NULL for plain DbTableTree.
3205  * @param stack A stack to use. Will be cleared.
3206  *
3207  * @return found object or NULL if no such key exists.
3208  */
find_next_from_pb_key(DbTable * tbl,TreeDbTerm *** rootpp,DbTreeStack * stack,Eterm pb_key,CATreeRootIterator * iter)3209 static TreeDbTerm *find_next_from_pb_key(DbTable *tbl,  TreeDbTerm*** rootpp,
3210                                          DbTreeStack* stack, Eterm pb_key,
3211                                          CATreeRootIterator* iter)
3212 {
3213     TreeDbTerm* root;
3214     TreeDbTerm *this;
3215     Uint candidate = 0;
3216     Sint c;
3217 
3218     if (iter) {
3219         *rootpp = catree_find_next_from_pb_key_root(pb_key, iter);
3220         ASSERT(*rootpp);
3221         root = **rootpp;
3222     }
3223     else {
3224         *rootpp = &tbl->tree.root;
3225         root = tbl->tree.root;
3226     }
3227 
3228     /* spool the stack, we have to "re-search" */
3229     stack->pos = stack->slot = 0;
3230     if (( this = root ) == NULL)
3231 	return NULL;
3232     for (;;) {
3233 	PUSH_NODE(stack, this);
3234 	if (( c = cmp_partly_bound(pb_key,GETKEY(tbl, this->dbterm.tpl))) >= 0) {
3235 	    if (this->right == NULL) {
3236                 stack->pos = candidate;
3237                 return TOP_NODE(stack);
3238 	    }
3239             this = this->right;
3240 	} else /*if (c < 0)*/ {
3241 	    if (this->left == NULL) /* Done */
3242 		return this;
3243             candidate = stack->pos;
3244             this = this->left;
3245 	}
3246     }
3247 }
3248 
3249 /** @brief Find object with largest key of all smaller than partially bound
3250  * key. Can be used as a starting point for a forward iteration with pb_key.
3251  *
3252  * @param pb_key The partially bound key. Example {42, '$1'}
3253  * @param *rootpp Will return pointer to root pointer of found object.
3254  * @param iter Root iterator or NULL for plain DbTableTree.
3255  * @param stack A stack to use. Will be cleared.
3256  *
3257  * @return found object or NULL if no such key exists.
3258  */
find_prev_from_pb_key(DbTable * tbl,TreeDbTerm *** rootpp,DbTreeStack * stack,Eterm pb_key,CATreeRootIterator * iter)3259 static TreeDbTerm *find_prev_from_pb_key(DbTable *tbl, TreeDbTerm*** rootpp,
3260                                          DbTreeStack* stack, Eterm pb_key,
3261                                          CATreeRootIterator* iter)
3262 {
3263     TreeDbTerm* root;
3264     TreeDbTerm *this;
3265     Uint candidate = 0;
3266     Sint c;
3267 
3268     if (iter) {
3269         *rootpp = catree_find_prev_from_pb_key_root(pb_key, iter);
3270         ASSERT(*rootpp);
3271         root = **rootpp;
3272     }
3273     else {
3274         *rootpp = &tbl->tree.root;
3275         root = tbl->tree.root;
3276     }
3277 
3278     /* spool the stack, we have to "re-search" */
3279     stack->pos = stack->slot = 0;
3280     if (( this = root ) == NULL)
3281 	return NULL;
3282     for (;;) {
3283 	PUSH_NODE(stack, this);
3284 	if (( c = cmp_partly_bound(pb_key,GETKEY(tbl, this->dbterm.tpl))) <= 0) {
3285 	    if (this->left == NULL) {
3286                 stack->pos = candidate;
3287                 return TOP_NODE(stack);
3288 	    }
3289             this = this->left;
3290 	} else /*if (c > 0)*/ {
3291 	    if (this->right == NULL) /* Done */
3292 		return this;
3293             candidate = stack->pos;
3294             this = this->right;
3295 	}
3296     }
3297 }
3298 
3299 
3300 /*
3301  * Just lookup a node
3302  */
find_node(DbTableCommon * tb,TreeDbTerm * root,Eterm key,DbTableTree * stack_container)3303 static TreeDbTerm *find_node(DbTableCommon *tb, TreeDbTerm *root,
3304                              Eterm key, DbTableTree *stack_container)
3305 {
3306     TreeDbTerm *this;
3307     Sint res;
3308     DbTreeStack* stack = get_static_stack(stack_container);
3309 
3310     if(!stack || EMPTY_NODE(stack)
3311        || !cmp_key_eq(tb, key, (this=TOP_NODE(stack)))) {
3312 
3313 	this = root;
3314 	while (this != NULL && (res = cmp_key(tb,key,this)) != 0) {
3315 	    if (res < 0)
3316 		this = this->left;
3317 	    else
3318 		this = this->right;
3319 	}
3320     }
3321     if (stack) {
3322 	release_stack((DbTable*)tb,stack_container,stack);
3323     }
3324     return this;
3325 }
3326 
3327 
db_find_tree_node_common(DbTableCommon * tb,TreeDbTerm * root,Eterm key)3328 TreeDbTerm *db_find_tree_node_common(DbTableCommon *tb, TreeDbTerm *root,
3329                                      Eterm key)
3330 {
3331     return find_node(tb, root, key, NULL);
3332 }
3333 
3334 
3335 /*
3336  * Lookup a node and return the address of the node pointer in the tree
3337  */
find_node2(DbTableCommon * tb,TreeDbTerm ** root,Eterm key)3338 static TreeDbTerm **find_node2(DbTableCommon *tb, TreeDbTerm **root, Eterm key)
3339 {
3340     TreeDbTerm **this;
3341     Sint res;
3342 
3343     this = root;
3344     while ((*this) != NULL && (res = cmp_key(tb, key, *this)) != 0) {
3345 	if (res < 0)
3346 	    this = &((*this)->left);
3347 	else
3348 	    this = &((*this)->right);
3349     }
3350     if (*this == NULL)
3351 	return NULL;
3352     return this;
3353 }
3354 
3355 /*
3356  * Find node and return the address of the node pointer (NULL if not found)
3357  * Tries to reuse the existing stack for performance.
3358  */
3359 
find_ptr(DbTableCommon * tb,TreeDbTerm ** root,DbTreeStack * stack,TreeDbTerm * this)3360 static TreeDbTerm **find_ptr(DbTableCommon *tb, TreeDbTerm **root,
3361                              DbTreeStack *stack, TreeDbTerm *this) {
3362     Eterm key = GETKEY(tb, this->dbterm.tpl);
3363     TreeDbTerm *tmp;
3364     TreeDbTerm *parent;
3365     Sint c;
3366 
3367     if(( tmp = TOP_NODE(stack)) != NULL) {
3368 	if (!cmp_key_eq(tb,key,tmp)) {
3369 	    /* Start from the beginning */
3370 	    stack->pos = stack->slot = 0;
3371 	}
3372     }
3373     if (EMPTY_NODE(stack)) { /* Have to rebuild the stack */
3374 	if (( tmp = *root ) == NULL)
3375 	    return NULL;
3376 	for (;;) {
3377 	    PUSH_NODE(stack, tmp);
3378 	    if (( c = cmp_key(tb,key,tmp) ) < 0) {
3379 		if (tmp->left == NULL) /* We are at the next
3380 					   and the element does
3381 					   not exist */
3382 		    break;
3383 		else
3384 		    tmp = tmp->left;
3385 	    } else if (c > 0) {
3386 		if (tmp->right == NULL) /* Done */
3387 		    return NULL;
3388 		else
3389 		    tmp = tmp->right;
3390 	    } else
3391 		break;
3392 	}
3393     }
3394 
3395     if (TOP_NODE(stack) != this)
3396         return NULL;
3397 
3398     parent = TOPN_NODE(stack, 1);
3399     if (parent == NULL)
3400         return ((this != *root) ? NULL : root);
3401     if (parent->left == this)
3402         return &(parent->left);
3403     if (parent->right == this)
3404         return &(parent->right);
3405     return NULL;
3406 }
3407 
db_lookup_dbterm_tree_common(Process * p,DbTable * tbl,TreeDbTerm ** root,Eterm key,Eterm obj,DbUpdateHandle * handle,DbTableTree * stack_container)3408 int db_lookup_dbterm_tree_common(Process *p, DbTable *tbl, TreeDbTerm **root,
3409                                  Eterm key, Eterm obj, DbUpdateHandle* handle,
3410                                  DbTableTree *stack_container)
3411 {
3412     TreeDbTerm **pp = find_node2(&tbl->common, root, key);
3413     int flags = 0;
3414 
3415     if (pp == NULL) {
3416         if (obj == THE_NON_VALUE) {
3417             return 0;
3418         } else {
3419             Eterm *objp = tuple_val(obj);
3420             int arity = arityval(*objp);
3421             Eterm *htop, *hend;
3422 
3423             ASSERT(arity >= tbl->common.keypos);
3424             htop = HAlloc(p, arity + 1);
3425             hend = htop + arity + 1;
3426             sys_memcpy(htop, objp, sizeof(Eterm) * (arity + 1));
3427             htop[tbl->common.keypos] = key;
3428             obj = make_tuple(htop);
3429 
3430             if (db_put_tree_common(&tbl->common, root,
3431                                    obj, 1, stack_container) != DB_ERROR_NONE) {
3432                 return 0;
3433             }
3434 
3435             pp = find_node2(&tbl->common, root, key);
3436             ASSERT(pp != NULL);
3437             HRelease(p, hend, htop);
3438             flags |= DB_NEW_OBJECT;
3439         }
3440     }
3441 
3442     handle->tb = tbl;
3443     handle->dbterm = &(*pp)->dbterm;
3444     handle->flags = flags;
3445     handle->bp = (void**) pp;
3446     handle->new_size = (*pp)->dbterm.size;
3447     return 1;
3448 }
3449 
3450 static int
db_lookup_dbterm_tree(Process * p,DbTable * tbl,Eterm key,Eterm obj,DbUpdateHandle * handle)3451 db_lookup_dbterm_tree(Process *p, DbTable *tbl, Eterm key, Eterm obj,
3452                       DbUpdateHandle* handle)
3453 {
3454     DbTableTree *tb = &tbl->tree;
3455     return db_lookup_dbterm_tree_common(p, tbl, &tb->root, key, obj, handle, tb);
3456 }
3457 
db_finalize_dbterm_tree_common(int cret,DbUpdateHandle * handle,TreeDbTerm ** root,DbTableTree * stack_container)3458 void db_finalize_dbterm_tree_common(int cret,
3459                                     DbUpdateHandle *handle,
3460                                     TreeDbTerm **root,
3461                                     DbTableTree *stack_container)
3462 {
3463     DbTable *tbl = handle->tb;
3464     TreeDbTerm *bp = (TreeDbTerm *) *handle->bp;
3465 
3466     if (handle->flags & DB_NEW_OBJECT && cret != DB_ERROR_NONE) {
3467         Eterm ret;
3468         db_erase_tree_common(tbl,
3469                              root,
3470                              GETKEY(&tbl->common, bp->dbterm.tpl),
3471                              &ret,
3472                              (stack_container == NULL ?
3473                               NULL : &stack_container->static_stack));
3474     } else if (handle->flags & DB_MUST_RESIZE) {
3475 	db_finalize_resize(handle, offsetof(TreeDbTerm,dbterm));
3476         reset_static_stack(stack_container);
3477 
3478         free_term(tbl, bp);
3479     }
3480 #ifdef DEBUG
3481     handle->dbterm = 0;
3482 #endif
3483     return;
3484 }
3485 
3486 static void
db_finalize_dbterm_tree(int cret,DbUpdateHandle * handle)3487 db_finalize_dbterm_tree(int cret, DbUpdateHandle *handle)
3488 {
3489     DbTable *tbl = handle->tb;
3490     DbTableTree *tb = &tbl->tree;
3491     db_finalize_dbterm_tree_common(cret, handle, &tb->root, tb);
3492 }
3493 
db_get_binary_info_tree(Process * p,DbTable * tbl,Eterm key,Eterm * ret)3494 static int db_get_binary_info_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
3495 {
3496     *ret = db_binary_info_tree_common(p, find_node(&tbl->common, tbl->tree.root,
3497                                                    key, &tbl->tree));
3498     return DB_ERROR_NONE;
3499 }
3500 
db_binary_info_tree_common(Process * p,TreeDbTerm * this)3501 Eterm db_binary_info_tree_common(Process* p, TreeDbTerm* this)
3502 {
3503     Eterm *hp, *hp_end;
3504     Uint hsz;
3505     Eterm ret;
3506 
3507     if (this == NULL) {
3508 	ret = NIL;
3509     } else {
3510         ErlOffHeap oh;
3511         hsz = 0;
3512 
3513         oh.first = this->dbterm.first_oh;
3514         erts_bld_bin_list(NULL, &hsz, &oh, NIL);
3515 
3516         hp = HAlloc(p, hsz);
3517         hp_end = hp + hsz;
3518         oh.first = this->dbterm.first_oh;
3519         ret = erts_bld_bin_list(&hp, NULL, &oh, NIL);
3520         ASSERT(hp == hp_end); (void)hp_end;
3521     }
3522     return ret;
3523 }
3524 
3525 
db_eterm_to_dbterm_tree_common(int compress,int keypos,Eterm obj)3526 void* db_eterm_to_dbterm_tree_common(int compress, int keypos, Eterm obj)
3527 {
3528     TreeDbTerm* term = new_dbterm_no_tab(compress, keypos, obj);
3529     term->left = NULL;
3530     term->right = NULL;
3531     return term;
3532 }
3533 
db_dbterm_list_prepend_tree_common(void * list,void * db_term)3534 void* db_dbterm_list_prepend_tree_common(void *list, void *db_term)
3535 {
3536     TreeDbTerm* l = list;
3537     TreeDbTerm* t = db_term;
3538     t->left = l;
3539     return t;
3540 }
3541 
db_dbterm_list_remove_first_tree_common(void ** list)3542 void* db_dbterm_list_remove_first_tree_common(void **list)
3543 {
3544     if (*list == NULL) {
3545         return NULL;
3546     } else {
3547         TreeDbTerm* t = (*list);
3548         TreeDbTerm* l = t->left;
3549         *list = l;
3550         return t;
3551     }
3552 }
3553 
3554 /*
3555  * Frees a TreeDbTerm without updating the memory footprint of the
3556  * table.
3557  */
db_free_dbterm_tree_common(int compressed,void * obj)3558 void db_free_dbterm_tree_common(int compressed, void* obj)
3559 {
3560     TreeDbTerm* p = obj;
3561     db_free_term_no_tab(compressed, p, offsetof(TreeDbTerm, dbterm));
3562 }
3563 
db_get_dbterm_key_tree_common(DbTable * tb,void * db_term)3564 Eterm db_get_dbterm_key_tree_common(DbTable* tb, void* db_term)
3565 {
3566     TreeDbTerm *term = db_term;
3567     return GETKEY(tb, term->dbterm.tpl);
3568 }
3569 
3570 /*
3571  * Traverse the tree with a callback function, used by db_match_xxx
3572  */
traverse_backwards(DbTableCommon * tb,DbTreeStack * stack,Eterm lastkey,traverse_doit_funcT * doit,struct select_common * context,CATreeRootIterator * iter)3573 static void traverse_backwards(DbTableCommon *tb,
3574 			       DbTreeStack* stack,
3575 			       Eterm lastkey,
3576                                traverse_doit_funcT* doit,
3577                                struct select_common *context,
3578                                CATreeRootIterator* iter)
3579 {
3580     TreeDbTerm *this, *next;
3581     TreeDbTerm** root = context->root;
3582 
3583     if (lastkey == THE_NON_VALUE) {
3584         if (iter) {
3585             while (*root == NULL) {
3586                 root = catree_find_prev_root(iter, NULL);
3587                 if (!root)
3588                     return;
3589             }
3590             context->root = root;
3591         }
3592         stack->pos = stack->slot = 0;
3593         next = *root;
3594         while (next != NULL) {
3595             PUSH_NODE(stack, next);
3596             next = next->right;
3597         }
3598         next = TOP_NODE(stack);
3599     } else {
3600         next = find_prev(tb, *root, stack, lastkey);
3601     }
3602 
3603     while (1) {
3604         while (next) {
3605             this = next;
3606             lastkey = GETKEY(tb, this->dbterm.tpl);
3607             next = find_prev(tb, *root, stack, lastkey);
3608             if (!((*doit)(tb, this, context, 0)))
3609                 return;
3610         }
3611 
3612         if (!iter)
3613             return;
3614         ASSERT(is_value(lastkey));
3615         root = catree_find_prev_root(iter, &lastkey);
3616         if (!root)
3617             return;
3618         context->root = root;
3619         stack->pos = stack->slot = 0;
3620         next = find_prev(tb, *root, stack, lastkey);
3621     }
3622 }
3623 
3624 /*
3625  * Traverse the tree with a callback function, used by db_match_xxx
3626  */
traverse_forward(DbTableCommon * tb,DbTreeStack * stack,Eterm lastkey,traverse_doit_funcT * doit,struct select_common * context,CATreeRootIterator * iter)3627 static void traverse_forward(DbTableCommon *tb,
3628 			     DbTreeStack* stack,
3629 			     Eterm lastkey,
3630                              traverse_doit_funcT* doit,
3631                              struct select_common *context,
3632                              CATreeRootIterator* iter)
3633 {
3634     TreeDbTerm *this, *next;
3635     TreeDbTerm **root = context->root;
3636 
3637     if (lastkey == THE_NON_VALUE) {
3638         if (iter) {
3639             while (*root == NULL) {
3640                 root = catree_find_next_root(iter, NULL);
3641                 if (!root)
3642                     return;
3643             }
3644             context->root = root;
3645         }
3646         stack->pos = stack->slot = 0;
3647         next = *root;
3648         while (next != NULL) {
3649             PUSH_NODE(stack, next);
3650             next = next->left;
3651         }
3652         next = TOP_NODE(stack);
3653     } else {
3654         next = find_next(tb, *root, stack, lastkey);
3655     }
3656 
3657     while (1) {
3658         while (next) {
3659             this = next;
3660             lastkey = GETKEY(tb, this->dbterm.tpl);
3661             next = find_next(tb, *root, stack, lastkey);
3662             if (!((*doit)(tb, this, context, 1)))
3663                 return;
3664         }
3665 
3666         if (!iter)
3667             return;
3668         ASSERT(is_value(lastkey));
3669         root = catree_find_next_root(iter, &lastkey);
3670         if (!root)
3671             return;
3672         context->root = root;
3673         stack->pos = stack->slot = 0;
3674         next = find_next(tb, *root, stack, lastkey);
3675     }
3676 }
3677 
3678 /*
3679  * Traverse the tree with an update callback function, used by db_select_replace
3680  */
traverse_update_backwards(DbTableCommon * tb,DbTreeStack * stack,Eterm lastkey,int (* doit)(DbTableCommon *,TreeDbTerm **,struct select_common *,int),struct select_common * context,CATreeRootIterator * iter)3681 static void traverse_update_backwards(DbTableCommon *tb,
3682                                       DbTreeStack* stack,
3683                                       Eterm lastkey,
3684                                       int (*doit)(DbTableCommon*,
3685                                                   TreeDbTerm**,
3686                                                   struct select_common*,
3687                                                   int),
3688                                       struct select_common* context,
3689                                       CATreeRootIterator* iter)
3690 {
3691     int res;
3692     TreeDbTerm *this, *next, **this_ptr;
3693     TreeDbTerm** root = context->root;
3694 
3695     if (lastkey == THE_NON_VALUE) {
3696         if (iter) {
3697             while (*root == NULL) {
3698                 root = catree_find_prev_root(iter, NULL);
3699                 if (!root)
3700                     return;
3701                 context->root = root;
3702             }
3703         }
3704         stack->pos = stack->slot = 0;
3705         next = *root;
3706         while (next) {
3707             PUSH_NODE(stack, next);
3708             next = next->right;
3709         }
3710         next = TOP_NODE(stack);
3711     }
3712     else
3713         next = find_prev(tb, *root, stack, lastkey);
3714 
3715 
3716     while (1) {
3717         while (next) {
3718             this = next;
3719             this_ptr = find_ptr(tb, root, stack, this);
3720             ASSERT(this_ptr != NULL);
3721             res = (*doit)(tb, this_ptr, context, 0);
3722             this = *this_ptr;
3723             REPLACE_TOP_NODE(stack, this);
3724             if (!res)
3725                 return;
3726             lastkey = GETKEY(tb, this->dbterm.tpl);
3727             next = find_prev(tb, *root, stack, lastkey);
3728         }
3729 
3730         if (!iter)
3731             return;
3732         ASSERT(is_value(lastkey));
3733         root = catree_find_prev_root(iter, &lastkey);
3734         if (!root)
3735             return;
3736         context->root = root;
3737         stack->pos = stack->slot = 0;
3738         next = find_prev(tb, *root, stack, lastkey);
3739     }
3740 }
3741 
key_boundness(DbTableCommon * tb,Eterm pattern,Eterm * keyp)3742 static enum ms_key_boundness key_boundness(DbTableCommon *tb,
3743                                            Eterm pattern, Eterm *keyp)
3744 {
3745     Eterm key;
3746 
3747     if (pattern == am_Underscore || db_is_variable(pattern) != -1)
3748 	return MS_KEY_UNBOUND;
3749     key = db_getkey(tb->keypos, pattern);
3750     if (is_non_value(key))
3751 	return MS_KEY_IMPOSSIBLE;  /* can't possibly match anything */
3752     if (!db_has_variable(key)) {   /* Bound key */
3753         *keyp = key;
3754 	return MS_KEY_BOUND;
3755     } else if (key != am_Underscore &&
3756 	       db_is_variable(key) < 0 && !db_has_map(key)) {
3757 
3758 	*keyp = key;
3759         return MS_KEY_PARTIALLY_BOUND;
3760     }
3761 
3762     return MS_KEY_UNBOUND;
3763 }
3764 
3765 
3766 
do_cmp_partly_bound(Eterm a,Eterm b,int * done)3767 static Sint do_cmp_partly_bound(Eterm a, Eterm b, int *done)
3768 {
3769     Eterm* aa;
3770     Eterm* bb;
3771     Eterm a_hdr;
3772     Eterm b_hdr;
3773     int i;
3774     Sint j;
3775 
3776     /* A variable matches anything */
3777     if (is_atom(a) && (a == am_Underscore || (db_is_variable(a) >= 0))) {
3778 	*done = 1;
3779 	return 0;
3780     }
3781     if (is_same(a,b))
3782 	return 0;
3783 
3784     switch (a & _TAG_PRIMARY_MASK) {
3785     case TAG_PRIMARY_LIST:
3786 	if (!is_list(b)) {
3787 	    return CMP(a,b);
3788 	}
3789 	aa = list_val(a);
3790 	bb = list_val(b);
3791 	while (1) {
3792 	    if ((j = do_cmp_partly_bound(*aa++, *bb++, done)) != 0 || *done)
3793 		return j;
3794 	    if (is_same(*aa, *bb))
3795 		return 0;
3796 	    if (is_not_list(*aa) || is_not_list(*bb))
3797 		return do_cmp_partly_bound(*aa, *bb, done);
3798 	    aa = list_val(*aa);
3799 	    bb = list_val(*bb);
3800 	}
3801     case TAG_PRIMARY_BOXED:
3802 	if ((b & _TAG_PRIMARY_MASK) != TAG_PRIMARY_BOXED) {
3803 	    return CMP(a,b);
3804 	}
3805 	a_hdr = ((*boxed_val(a)) & _TAG_HEADER_MASK) >> _TAG_PRIMARY_SIZE;
3806 	b_hdr = ((*boxed_val(b)) & _TAG_HEADER_MASK) >> _TAG_PRIMARY_SIZE;
3807 	if (a_hdr != b_hdr) {
3808 	    return CMP(a,b);
3809 	}
3810 	if (a_hdr == (_TAG_HEADER_ARITYVAL >> _TAG_PRIMARY_SIZE)) {
3811 	    aa = tuple_val(a);
3812 	    bb = tuple_val(b);
3813 	    /* compare the arities */
3814 	    i = arityval(*aa);	/* get the arity*/
3815 	    if (i < arityval(*bb)) return(-1);
3816 	    if (i > arityval(*bb)) return(1);
3817 	    while (i--) {
3818 		if ((j = do_cmp_partly_bound(*++aa, *++bb, done)) != 0
3819 		    || *done)
3820 		    return j;
3821 	    }
3822 	    return 0;
3823 	}
3824 	/* Drop through */
3825       default:
3826 	  return CMP(a,b);
3827     }
3828 }
3829 
cmp_partly_bound(Eterm partly_bound_key,Eterm bound_key)3830 Sint cmp_partly_bound(Eterm partly_bound_key, Eterm bound_key)
3831 {
3832     int done = 0;
3833     Sint ret = do_cmp_partly_bound(partly_bound_key, bound_key, &done);
3834 #ifdef HARDDEBUG
3835     erts_fprintf(stderr,"\ncmp_partly_bound: %T", partly_bound_key);
3836     if (ret < 0)
3837 	erts_fprintf(stderr," < ");
3838     else if (ret > 0)
3839 	erts_fprintf(stderr," > ");
3840     else
3841 	erts_fprintf(stderr," == ");
3842     erts_fprintf(stderr,"%T\n", bound_key);
3843 #endif
3844     return ret;
3845 }
3846 
3847 /*
3848 ** For partly_bound debugging....
3849 **
3850 BIF_RETTYPE ets_testnisse_2(BIF_ALIST_2)
3851 BIF_ADECL_2
3852 {
3853     Eterm r1 = make_small(partly_bound_can_match_lesser(BIF_ARG_1,
3854 							BIF_ARG_2));
3855     Eterm r2 = make_small(partly_bound_can_match_greater(BIF_ARG_1,
3856 							 BIF_ARG_2));
3857     Eterm *hp = HAlloc(BIF_P,3);
3858     Eterm ret;
3859 
3860     ret = TUPLE2(hp,r1,r2);
3861     BIF_RET(ret);
3862 }
3863 **
3864 */
partly_bound_can_match_lesser(Eterm partly_bound_1,Eterm partly_bound_2)3865 static int partly_bound_can_match_lesser(Eterm partly_bound_1,
3866 					 Eterm partly_bound_2)
3867 {
3868     int done = 0;
3869     int ret = do_partly_bound_can_match_lesser(partly_bound_1,
3870 					       partly_bound_2,
3871 					       &done);
3872 #ifdef HARDDEBUG
3873     erts_fprintf(stderr,"\npartly_bound_can_match_lesser: %T",partly_bound_1);
3874     if (ret)
3875 	erts_fprintf(stderr," can match lesser than ");
3876     else
3877 	erts_fprintf(stderr," cannot match lesser than ");
3878     erts_fprintf(stderr,"%T\n",partly_bound_2);
3879 #endif
3880     return ret;
3881 }
3882 
partly_bound_can_match_greater(Eterm partly_bound_1,Eterm partly_bound_2)3883 static int partly_bound_can_match_greater(Eterm partly_bound_1,
3884 					  Eterm partly_bound_2)
3885 {
3886     int done = 0;
3887     int ret = do_partly_bound_can_match_greater(partly_bound_1,
3888 						partly_bound_2,
3889 						&done);
3890 #ifdef HARDDEBUG
3891     erts_fprintf(stderr,"\npartly_bound_can_match_greater: %T",partly_bound_1);
3892     if (ret)
3893 	erts_fprintf(stderr," can match greater than ");
3894     else
3895 	erts_fprintf(stderr," cannot match greater than ");
3896     erts_fprintf(stderr,"%T\n",partly_bound_2);
3897 #endif
3898     return ret;
3899 }
3900 
do_partly_bound_can_match_lesser(Eterm a,Eterm b,int * done)3901 static int do_partly_bound_can_match_lesser(Eterm a, Eterm b,
3902 					    int *done)
3903 {
3904     Eterm* aa;
3905     Eterm* bb;
3906     Sint i;
3907     int j;
3908 
3909     if (is_atom(a) && (a == am_Underscore ||
3910 		       (db_is_variable(a) >= 0))) {
3911 	*done = 1;
3912 	if (is_atom(b) && (b == am_Underscore ||
3913 			   (db_is_variable(b) >= 0))) {
3914 	    return 0;
3915 	} else {
3916 	    return 1;
3917 	}
3918     } else if (is_atom(b) && (b == am_Underscore ||
3919 			      (db_is_variable(b) >= 0))) {
3920 	*done = 1;
3921 	return 0;
3922     }
3923 
3924     if (a == b)
3925 	return 0;
3926 
3927     if (not_eq_tags(a,b)) {
3928 	*done = 1;
3929 	return (CMP(a, b) < 0) ? 1 : 0;
3930     }
3931 
3932     /* we now know that tags are the same */
3933     switch (tag_val_def(a)) {
3934     case TUPLE_DEF:
3935 	aa = tuple_val(a);
3936 	bb = tuple_val(b);
3937 	/* compare the arities */
3938 	if (arityval(*aa) < arityval(*bb)) return 1;
3939 	if (arityval(*aa) > arityval(*bb)) return 0;
3940 	i = arityval(*aa);	/* get the arity*/
3941 	while (i--) {
3942 	    if ((j = do_partly_bound_can_match_lesser(*++aa, *++bb,
3943 						      done)) != 0
3944 		|| *done)
3945 		return j;
3946 	}
3947 	return 0;
3948     case LIST_DEF:
3949 	aa = list_val(a);
3950 	bb = list_val(b);
3951 	while (1) {
3952 	    if ((j = do_partly_bound_can_match_lesser(*aa++, *bb++,
3953 						      done)) != 0
3954 		|| *done)
3955 		return j;
3956 	    if (*aa==*bb)
3957 		return 0;
3958 	    if (is_not_list(*aa) || is_not_list(*bb))
3959 		return do_partly_bound_can_match_lesser(*aa, *bb,
3960 							done);
3961 	    aa = list_val(*aa);
3962 	    bb = list_val(*bb);
3963 	}
3964     default:
3965 	if((i = CMP(a, b)) != 0) {
3966 	    *done = 1;
3967 	}
3968 	return (i < 0) ? 1 : 0;
3969     }
3970 }
3971 
do_partly_bound_can_match_greater(Eterm a,Eterm b,int * done)3972 static int do_partly_bound_can_match_greater(Eterm a, Eterm b,
3973 					    int *done)
3974 {
3975     Eterm* aa;
3976     Eterm* bb;
3977     Sint i;
3978     int j;
3979 
3980     if (is_atom(a) && (a == am_Underscore ||
3981 		       (db_is_variable(a) >= 0))) {
3982 	*done = 1;
3983 	if (is_atom(b) && (b == am_Underscore ||
3984 			   (db_is_variable(b) >= 0))) {
3985 	    return 0;
3986 	} else {
3987 	    return 1;
3988 	}
3989     } else if (is_atom(b) && (b == am_Underscore ||
3990 			      (db_is_variable(b) >= 0))) {
3991 	*done = 1;
3992 	return 0;
3993     }
3994 
3995     if (a == b)
3996 	return 0;
3997 
3998     if (not_eq_tags(a,b)) {
3999 	*done = 1;
4000 	return (CMP(a, b) > 0) ? 1 : 0;
4001     }
4002 
4003     /* we now know that tags are the same */
4004     switch (tag_val_def(a)) {
4005     case TUPLE_DEF:
4006 	aa = tuple_val(a);
4007 	bb = tuple_val(b);
4008 	/* compare the arities */
4009 	if (arityval(*aa) < arityval(*bb)) return 0;
4010 	if (arityval(*aa) > arityval(*bb)) return 1;
4011 	i = arityval(*aa);	/* get the arity*/
4012 	while (i--) {
4013 	    if ((j = do_partly_bound_can_match_greater(*++aa, *++bb,
4014 						      done)) != 0
4015 		|| *done)
4016 		return j;
4017 	}
4018 	return 0;
4019     case LIST_DEF:
4020 	aa = list_val(a);
4021 	bb = list_val(b);
4022 	while (1) {
4023 	    if ((j = do_partly_bound_can_match_greater(*aa++, *bb++,
4024 						      done)) != 0
4025 		|| *done)
4026 		return j;
4027 	    if (*aa==*bb)
4028 		return 0;
4029 	    if (is_not_list(*aa) || is_not_list(*bb))
4030 		return do_partly_bound_can_match_greater(*aa, *bb,
4031 							done);
4032 	    aa = list_val(*aa);
4033 	    bb = list_val(*bb);
4034 	}
4035     default:
4036 	if((i = CMP(a, b)) != 0) {
4037 	    *done = 1;
4038 	}
4039 	return (i > 0) ? 1 : 0;
4040     }
4041 }
4042 
4043 /*
4044  * Callback functions for the different match functions
4045  */
4046 
doit_select(DbTableCommon * tb,TreeDbTerm * this,struct select_common * ptr,int forward)4047 static int doit_select(DbTableCommon *tb, TreeDbTerm *this,
4048                        struct select_common* ptr,
4049 		       int forward)
4050 {
4051     struct select_context *sc = (struct select_context *) ptr;
4052     Eterm ret;
4053 
4054     sc->lastobj = this->dbterm.tpl;
4055 
4056     if (sc->end_condition != NIL &&
4057 	((forward &&
4058 	  cmp_partly_bound(sc->end_condition,
4059 			   GETKEY_WITH_POS(sc->keypos, this->dbterm.tpl)) < 0) ||
4060 	 (!forward &&
4061 	  cmp_partly_bound(sc->end_condition,
4062 			   GETKEY_WITH_POS(sc->keypos, this->dbterm.tpl)) > 0))) {
4063 	return 0;
4064     }
4065     ret = db_match_dbterm(tb, sc->p, sc->mp, &this->dbterm, ERTS_PAM_COPY_RESULT);
4066     if (is_value(ret)) {
4067         Eterm *hp = HAlloc(sc->p, 2);
4068 	sc->accum = CONS(hp, ret, sc->accum);
4069     }
4070     if (--(sc->max) <= 0) {
4071 	return 0;
4072     }
4073     return 1;
4074 }
4075 
doit_select_count(DbTableCommon * tb,TreeDbTerm * this,struct select_common * ptr,int forward)4076 static int doit_select_count(DbTableCommon *tb, TreeDbTerm *this,
4077                              struct select_common* ptr,
4078 			     int forward)
4079 {
4080     struct select_count_context *sc = (struct select_count_context *) ptr;
4081     Eterm ret;
4082 
4083     sc->lastobj = this->dbterm.tpl;
4084 
4085     /* Always backwards traversing */
4086     if (sc->end_condition != NIL &&
4087 	(cmp_partly_bound(sc->end_condition,
4088 			  GETKEY_WITH_POS(sc->keypos, this->dbterm.tpl)) > 0)) {
4089 	return 0;
4090     }
4091     ret = db_match_dbterm(tb, sc->p, sc->mp, &this->dbterm, ERTS_PAM_TMP_RESULT);
4092     if (ret == am_true) {
4093 	++(sc->got);
4094     }
4095     if (--(sc->max) <= 0) {
4096 	return 0;
4097     }
4098     return 1;
4099 }
4100 
doit_select_chunk(DbTableCommon * tb,TreeDbTerm * this,struct select_common * ptr,int forward)4101 static int doit_select_chunk(DbTableCommon *tb, TreeDbTerm *this,
4102                              struct select_common* ptr,
4103 			     int forward)
4104 {
4105     struct select_context *sc = (struct select_context *) ptr;
4106     Eterm ret;
4107 
4108     sc->lastobj = this->dbterm.tpl;
4109 
4110     if (sc->end_condition != NIL &&
4111 	((forward &&
4112 	  cmp_partly_bound(sc->end_condition,
4113 			   GETKEY_WITH_POS(sc->keypos, this->dbterm.tpl)) < 0) ||
4114 	 (!forward &&
4115 	  cmp_partly_bound(sc->end_condition,
4116 			   GETKEY_WITH_POS(sc->keypos, this->dbterm.tpl)) > 0))) {
4117 	return 0;
4118     }
4119 
4120     ret = db_match_dbterm(tb, sc->p, sc->mp, &this->dbterm, ERTS_PAM_COPY_RESULT);
4121     if (is_value(ret)) {
4122         Eterm *hp = HAlloc(sc->p, 2);
4123 	++(sc->got);
4124 	sc->accum = CONS(hp, ret, sc->accum);
4125     }
4126     if (--(sc->max) <= 0 || sc->got == sc->chunk_size) {
4127 	return 0;
4128     }
4129     return 1;
4130 }
4131 
4132 
doit_select_delete(DbTableCommon * tb,TreeDbTerm * this,struct select_common * ptr,int forward)4133 static int doit_select_delete(DbTableCommon *tb, TreeDbTerm *this,
4134                               struct select_common *ptr,
4135 			      int forward)
4136 {
4137     struct select_delete_context *sc = (struct select_delete_context *) ptr;
4138     Eterm ret;
4139     Eterm key;
4140 
4141     if (sc->erase_lastterm)
4142 	free_term((DbTable*)tb, sc->lastterm);
4143     sc->erase_lastterm = 0;
4144     sc->lastterm = this;
4145 
4146     if (sc->end_condition != NIL &&
4147 	cmp_partly_bound(sc->end_condition,
4148 			 GETKEY_WITH_POS(sc->keypos, this->dbterm.tpl)) > 0)
4149 	return 0;
4150     ret = db_match_dbterm(tb, sc->p, sc->mp, &this->dbterm, ERTS_PAM_TMP_RESULT);
4151     if (ret == am_true) {
4152 	key = GETKEY(sc->tb, this->dbterm.tpl);
4153 	linkout_tree(sc->tb, sc->common.root, key, sc->stack);
4154 	sc->erase_lastterm = 1;
4155 	++sc->accum;
4156     }
4157     if (--(sc->max) <= 0) {
4158 	return 0;
4159     }
4160     return 1;
4161 }
4162 
doit_select_replace(DbTableCommon * tb,TreeDbTerm ** this,struct select_common * ptr,int forward)4163 static int doit_select_replace(DbTableCommon *tb, TreeDbTerm **this,
4164                                struct select_common* ptr,
4165                                int forward)
4166 {
4167     struct select_replace_context *sc = (struct select_replace_context *) ptr;
4168     DbTerm* obj;
4169     Eterm ret;
4170 
4171     sc->lastobj = (*this)->dbterm.tpl;
4172 
4173     /* Always backwards traversing */
4174     if (sc->end_condition != NIL &&
4175 	(cmp_partly_bound(sc->end_condition,
4176 			  GETKEY_WITH_POS(sc->keypos, (*this)->dbterm.tpl)) > 0)) {
4177 	return 0;
4178     }
4179     obj = &(*this)->dbterm;
4180     if (tb->compress)
4181         obj = db_alloc_tmp_uncompressed(tb, obj);
4182     ret = db_match_dbterm_uncompressed(tb, sc->p, sc->mp, obj, ERTS_PAM_TMP_RESULT);
4183 
4184     if (is_value(ret)) {
4185         TreeDbTerm* new;
4186         TreeDbTerm* old = *this;
4187 #ifdef DEBUG
4188         Eterm key = db_getkey(tb->keypos, ret);
4189         ASSERT(is_value(key));
4190         ASSERT(cmp_key(tb, key, old) == 0);
4191 #endif
4192         new = new_dbterm(tb, ret);
4193         new->left = old->left;
4194         new->right = old->right;
4195         new->balance = old->balance;
4196         sc->lastobj = new->dbterm.tpl;
4197         *this = new;
4198         free_term((DbTable*)tb, old);
4199         ++(sc->replaced);
4200     }
4201     if (tb->compress)
4202         db_free_tmp_uncompressed(obj);
4203     if (--(sc->max) <= 0) {
4204 	return 0;
4205     }
4206     return 1;
4207 }
4208 
4209 void
erts_db_foreach_thr_prgr_offheap_tree(void (* func)(ErlOffHeap *,void *),void * arg)4210 erts_db_foreach_thr_prgr_offheap_tree(void (*func)(ErlOffHeap *, void *),
4211                                       void *arg)
4212 {
4213 }
4214 
4215 #ifdef TREE_DEBUG
do_dump_tree2(DbTableCommon * tb,int to,void * to_arg,int show,TreeDbTerm * t,int offset)4216 static void do_dump_tree2(DbTableCommon* tb, int to, void *to_arg, int show,
4217 			  TreeDbTerm *t, int offset)
4218 {
4219     if (t == NULL)
4220 	return;
4221     do_dump_tree2(tb, to, to_arg, show, t->right, offset + 4);
4222     if (show) {
4223 	const char* prefix;
4224 	Eterm term;
4225 	if (tb->compress) {
4226 	    prefix = "key=";
4227 	    term = GETKEY(tb, t->dbterm.tpl);
4228 	}
4229 	else {
4230 	    prefix = "";
4231 	    term = make_tuple(t->dbterm.tpl);
4232 	}
4233 	erts_print(to, to_arg, "%*s%s%T (addr = %p, bal = %d)\n",
4234 		   offset, "", prefix, term, t, t->balance);
4235     }
4236     do_dump_tree2(tb, to, to_arg, show, t->left, offset + 4);
4237 }
4238 
4239 #endif
4240 
4241 #ifdef HARDDEBUG
4242 
4243 /*
4244  * No called, but kept as it might come to use
4245  */
db_check_table_tree(DbTable * tbl)4246 void db_check_table_tree(DbTable *tbl)
4247 {
4248     DbTableTree *tb = &tbl->tree;
4249     check_table_tree(tb, tb->root);
4250     check_saved_stack(tb);
4251     check_slot_pos(tb);
4252 }
4253 
traverse_until(TreeDbTerm * t,int * current,int to)4254 static TreeDbTerm *traverse_until(TreeDbTerm *t, int *current, int to)
4255 {
4256     TreeDbTerm *tmp;
4257     if (t == NULL)
4258 	return NULL;
4259     tmp = traverse_until(t->left, current, to);
4260     if (tmp != NULL)
4261 	return tmp;
4262     ++(*current);
4263     if (*current == to)
4264 	return t;
4265     return traverse_until(t->right, current, to);
4266 }
4267 
check_slot_pos(DbTableTree * tb)4268 static void check_slot_pos(DbTableTree *tb)
4269 {
4270     int pos = 0;
4271     TreeDbTerm *t;
4272     if (tb->stack.slot == 0 || tb->stack.pos == 0)
4273 	return;
4274     t = traverse_until(tb->root, &pos, tb->stack.slot);
4275     if (t != tb->stack.array[tb->stack.pos - 1]) {
4276 	erts_fprintf(stderr, "Slot position does not correspont with stack, "
4277 		   "element position %d is really 0x%08X, when stack says "
4278 		   "it's 0x%08X\n", tb->stack.slot, t,
4279 		   tb->stack.array[tb->stack.pos - 1]);
4280 	do_dump_tree2(&tb->common, ERTS_PRINT_STDERR, NULL, 1, tb->root, 0);
4281     }
4282 }
4283 
4284 
check_saved_stack(DbTableTree * tb)4285 static void check_saved_stack(DbTableTree *tb)
4286 {
4287      TreeDbTerm *t = tb->root;
4288      DbTreeStack* stack = &tb->static_stack;
4289      int n = 0;
4290      if (stack->pos == 0)
4291 	 return;
4292      if (t != stack->array[0]) {
4293 	 erts_fprintf(stderr,"tb->stack[0] is 0x%08X, should be 0x%08X\n",
4294 		      stack->array[0], t);
4295 	 do_dump_tree2(&tb->common, ERTS_PRINT_STDERR, NULL, 1, tb->root, 0);
4296 	 return;
4297      }
4298      while (n < stack->pos) {
4299 	 if (t == NULL) {
4300 	     erts_fprintf(stderr, "NULL pointer in tree when stack not empty,"
4301 			" stack depth is %d\n", n);
4302 	     do_dump_tree2(&tb->common, ERTS_PRINT_STDERR, NULL, 1, tb->root, 0);
4303 	     return;
4304 	 }
4305 	 n++;
4306 	 if (n < stack->pos) {
4307 	     if (stack->array[n] == t->left)
4308 		 t = t->left;
4309 	     else if (stack->array[n] == t->right)
4310 		 t = t->right;
4311 	     else {
4312 		 erts_fprintf(stderr, "tb->stack[%d] == 0x%08X does not "
4313 			    "represent child pointer in tree!"
4314 			    "(left == 0x%08X, right == 0x%08X\n",
4315 			    n, tb->stack[n], t->left, t->right);
4316 		 do_dump_tree2(&tb->common, ERTS_PRINT_STDERR, NULL, 1, tb->root, 0);
4317 		 return;
4318 	     }
4319 	 }
4320      }
4321 }
4322 
check_table_tree(DbTableTree * tb,TreeDbTerm * t)4323 static int check_table_tree(DbTableTree* tb, TreeDbTerm *t)
4324 {
4325     int lh, rh;
4326     if (t == NULL)
4327 	return 0;
4328     lh = check_table_tree(tb, t->left);
4329     rh = check_table_tree(tb, t->right);
4330     if ((rh - lh) != t->balance) {
4331 	erts_fprintf(stderr, "Invalid tree balance for this node:\n");
4332 	erts_fprintf(stderr,"balance = %d, left = 0x%08X, right = 0x%08X\n",
4333 		     t->balance, t->left, t->right);
4334 	erts_fprintf(stderr,"\nDump:\n---------------------------------\n");
4335 	do_dump_tree2(&tb->common, ERTS_PRINT_STDERR, NULL, 1, t, 0);
4336 	erts_fprintf(stderr,"\n---------------------------------\n");
4337     }
4338     return ((rh > lh) ? rh : lh) + 1;
4339 }
4340 
4341 #endif
4342