1 /*
2  * %CopyrightBegin%
3  *
4  * Copyright Ericsson AB 1996-2020. All Rights Reserved.
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * %CopyrightEnd%
19  */
20 
21 /*
22  * This file contains the 'ets' bif interface functions.
23  */
24 
25 /*
26 #ifdef DEBUG
27 #define HARDDEBUG 1
28 #endif
29 */
30 
31 #ifdef HAVE_CONFIG_H
32 #  include "config.h"
33 #endif
34 
35 #include "sys.h"
36 #include "erl_vm.h"
37 #include "global.h"
38 #include "erl_process.h"
39 #include "error.h"
40 #define ERTS_WANT_DB_INTERNAL__
41 #include "erl_db.h"
42 #include "bif.h"
43 #include "big.h"
44 #include "erl_binary.h"
45 #include "bif.h"
46 
47 /*
48  * Extended error information for ETS functions.
49  */
50 
51 #define EXI_TYPE     am_type	/* The type is wrong. */
52 #define EXI_ID       am_id	/* The table identifier is invalid. */
53 #define EXI_ACCESS   am_access /* Insufficient access rights for ETS table. */
54 #define EXI_TAB_TYPE am_table_type /* Unsupported table type for this operation. */
55 #define EXI_BAD_KEY  am_badkey	/* No such key exists in the table. */
56 #define EXI_KEY_POS  am_keypos /* The element to update is also the key. */
57 #define EXI_POSITION am_position /* The position is out of range. */
58 #define EXI_OWNER    am_owner	 /* The receiving process is already the owner. */
59 #define EXI_NOT_OWNER am_not_owner /* The current process is not the owner. */
60 
61 erts_atomic_t erts_ets_misc_mem_size;
62 
63 /*
64 ** Utility macros
65 */
66 
67 #if defined(DEBUG)
68 # define DBG_RANDOM_REDS(REDS, SEED) \
69          ((REDS) * 0.1 * erts_sched_local_random_float(SEED))
70 #else
71 # define DBG_RANDOM_REDS(REDS, SEED) (REDS)
72 #endif
73 
74 
75 
76 #define DB_BIF_GET_TABLE(TB, WHAT, KIND, BIF_IX) \
77         DB_GET_TABLE(TB, BIF_ARG_1, WHAT, KIND, BIF_IX, NULL, BIF_P)
78 
79 #define DB_TRAP_GET_TABLE(TB, TID, WHAT, KIND, BIF_EXP) \
80         DB_GET_TABLE(TB, TID, WHAT, KIND, 0, BIF_EXP, BIF_P)
81 
82 #define DB_GET_TABLE(TB, TID, WHAT, KIND, BIF_IX, BIF_EXP, PROC)         \
83 do {                                                                     \
84     Uint freason__;                                                      \
85     if (!(TB = db_get_table(PROC, TID, WHAT, KIND, &freason__))) {       \
86         return db_bif_fail(PROC, freason__, BIF_IX, BIF_EXP);            \
87     }                                                                    \
88 }while(0)
89 
90 #define DB_GET_APPROX_NITEMS(DB)                                        \
91     erts_flxctr_read_approx(&(DB)->common.counters, ERTS_DB_TABLE_NITEMS_COUNTER_ID)
92 #define DB_GET_APPROX_MEM_CONSUMED(DB)                                  \
93     erts_flxctr_read_approx(&(DB)->common.counters, ERTS_DB_TABLE_MEM_COUNTER_ID)
94 
db_bif_fail(Process * p,Uint freason,Uint bif_ix,Export * bif_exp)95 static BIF_RETTYPE db_bif_fail(Process* p, Uint freason,
96                                Uint bif_ix, Export* bif_exp)
97 {
98     if (freason == TRAP) {
99         if (!bif_exp) {
100             bif_exp = BIF_TRAP_EXPORT(bif_ix);
101         }
102 
103         ERTS_BIF_PREP_TRAP(bif_exp, p, bif_exp->info.mfa.arity);
104     }
105 
106     p->freason = freason;
107     return THE_NON_VALUE;
108 }
109 
110 
111 /* Get a key from any table structure and a tagged object */
112 #define TERM_GETKEY(tb, obj) db_getkey((tb)->common.keypos, (obj))
113 
114 #  define ITERATION_SAFETY(Proc,Tab) \
115     ((IS_TREE_TABLE((Tab)->common.status) || IS_CATREE_TABLE((Tab)->common.status) \
116       || ONLY_WRITER(Proc,Tab)) ? ITER_SAFE                             \
117      : (((Tab)->common.status & DB_FINE_LOCKED) ? ITER_UNSAFE : ITER_SAFE_LOCKED))
118 
119 #define DID_TRAP(P,Ret) (!is_value(Ret) && ((P)->freason == TRAP))
120 
121 /*
122  * "fixed_tabs": list of all fixed tables for a process
123  */
124 #ifdef DEBUG
125 static int fixed_tabs_find(DbFixation* first, DbFixation* fix);
126 #endif
127 
fixed_tabs_insert(Process * p,DbFixation * fix)128 static void fixed_tabs_insert(Process* p, DbFixation* fix)
129 {
130     DbFixation* first = erts_psd_get(p, ERTS_PSD_ETS_FIXED_TABLES);
131 
132     if (!first) {
133         fix->tabs.next = fix->tabs.prev = fix;
134         erts_psd_set(p, ERTS_PSD_ETS_FIXED_TABLES, fix);
135     }
136     else {
137         ASSERT(!fixed_tabs_find(first, fix));
138         fix->tabs.prev = first->tabs.prev;
139         fix->tabs.next = first;
140         fix->tabs.prev->tabs.next = fix;
141         first->tabs.prev = fix;
142     }
143 }
144 
fixed_tabs_delete(Process * p,DbFixation * fix)145 static void fixed_tabs_delete(Process *p, DbFixation* fix)
146 {
147     if (fix->tabs.next == fix) {
148         DbFixation* old;
149         ASSERT(fix->tabs.prev == fix);
150         old = erts_psd_set(p, ERTS_PSD_ETS_FIXED_TABLES, NULL);
151         ASSERT(old == fix); (void)old;
152     }
153     else {
154         DbFixation *first = (DbFixation*) erts_psd_get(p, ERTS_PSD_ETS_FIXED_TABLES);
155 
156         ASSERT(fixed_tabs_find(first, fix));
157         fix->tabs.prev->tabs.next = fix->tabs.next;
158         fix->tabs.next->tabs.prev = fix->tabs.prev;
159 
160         if (fix == first)
161             erts_psd_set(p, ERTS_PSD_ETS_FIXED_TABLES, fix->tabs.next);
162     }
163 }
164 
165 #ifdef DEBUG
fixed_tabs_find(DbFixation * first,DbFixation * fix)166 static int fixed_tabs_find(DbFixation* first, DbFixation* fix)
167 {
168     DbFixation* p;
169 
170     if (!first) {
171         first = (DbFixation*) erts_psd_get(fix->procs.p, ERTS_PSD_ETS_FIXED_TABLES);
172     }
173     p = first;
174     do {
175         if (p == fix)
176             return 1;
177         ASSERT(p->procs.p == fix->procs.p);
178         ASSERT(p->tabs.next->tabs.prev == p);
179         p = p->tabs.next;
180     } while (p != first);
181     return 0;
182 }
183 #endif
184 
185 
186 /*
187  * fixing_procs: tree of all processes fixating a table
188  */
189 #define ERTS_RBT_PREFIX fixing_procs
190 #define ERTS_RBT_T DbFixation
191 #define ERTS_RBT_KEY_T Process*
192 #define ERTS_RBT_FLAGS_T int
193 #define ERTS_RBT_INIT_EMPTY_TNODE(T)                    \
194     do {						\
195 	(T)->procs.parent = NULL;			\
196 	(T)->procs.right = NULL;				\
197 	(T)->procs.left = NULL;				\
198     } while (0)
199 #define ERTS_RBT_IS_RED(T)        ((T)->procs.is_red)
200 #define ERTS_RBT_SET_RED(T)       ((T)->procs.is_red = 1)
201 #define ERTS_RBT_IS_BLACK(T)      (!(T)->procs.is_red)
202 #define ERTS_RBT_SET_BLACK(T)     ((T)->procs.is_red = 0)
203 #define ERTS_RBT_GET_FLAGS(T)     ((T)->procs.is_red)
204 #define ERTS_RBT_SET_FLAGS(T, F)  ((T)->procs.is_red = (F))
205 #define ERTS_RBT_GET_PARENT(T)    ((T)->procs.parent)
206 #define ERTS_RBT_SET_PARENT(T, P) ((T)->procs.parent = (P))
207 #define ERTS_RBT_GET_RIGHT(T)     ((T)->procs.right)
208 #define ERTS_RBT_SET_RIGHT(T, R)  ((T)->procs.right = (R))
209 #define ERTS_RBT_GET_LEFT(T)      ((T)->procs.left)
210 #define ERTS_RBT_SET_LEFT(T, L)   ((T)->procs.left = (L))
211 #define ERTS_RBT_GET_KEY(T)       ((T)->procs.p)
212 #define ERTS_RBT_IS_LT(KX, KY)    ((KX) < (KY))
213 #define ERTS_RBT_IS_EQ(KX, KY)    ((KX) == (KY))
214 
215 #define ERTS_RBT_WANT_INSERT
216 #define ERTS_RBT_WANT_LOOKUP
217 #define ERTS_RBT_WANT_DELETE
218 #define ERTS_RBT_WANT_FOREACH
219 #define ERTS_RBT_WANT_FOREACH_DESTROY
220 #define ERTS_RBT_UNDEF
221 
222 #include "erl_rbtree.h"
223 
224 #ifdef HARDDEBUG
225 # error Do something useful with CHECK_TABLES maybe
226 #else
227 # define CHECK_TABLES()
228 #endif
229 
230 
231 static void
232 send_ets_transfer_message(Process *c_p, Process *proc,
233                           ErtsProcLocks *locks,
234                           DbTable *tb, Eterm heir_data);
235 static void schedule_free_dbtable(DbTable* tb);
236 static void delete_sched_table(Process *c_p, DbTable *tb);
237 
table_dec_refc(DbTable * tb,erts_aint_t min_val)238 static void table_dec_refc(DbTable *tb, erts_aint_t min_val)
239 {
240     if (erts_refc_dectest(&tb->common.refc, min_val) == 0)
241 	schedule_free_dbtable(tb);
242 }
243 
244 static int
db_table_tid_destructor(Binary * unused)245 db_table_tid_destructor(Binary *unused)
246 {
247     return 1;
248 }
249 
250 static ERTS_INLINE void
make_btid(DbTable * tb)251 make_btid(DbTable *tb)
252 {
253     Binary *btid = erts_create_magic_indirection(db_table_tid_destructor);
254     erts_atomic_t *tbref = erts_binary_to_magic_indirection(btid);
255     erts_atomic_init_nob(tbref, (erts_aint_t) tb);
256     tb->common.btid = btid;
257     /*
258      * Table and magic indirection refer eachother,
259      * and table is refered once by being alive...
260      */
261     erts_refc_init(&tb->common.refc, 2);
262     erts_refc_inc(&btid->intern.refc, 1);
263 }
264 
btid2tab(Binary * btid)265 static ERTS_INLINE DbTable* btid2tab(Binary* btid)
266 {
267     erts_atomic_t *tbref = erts_binary_to_magic_indirection(btid);
268     return (DbTable *) erts_atomic_read_nob(tbref);
269 }
270 
271 static DbTable *
tid2tab(Eterm tid,Eterm * error_info_p)272 tid2tab(Eterm tid, Eterm *error_info_p)
273 {
274     DbTable *tb;
275     Binary *btid;
276     erts_atomic_t *tbref;
277 
278     ASSERT(error_info_p);
279     if (!is_internal_magic_ref(tid)) {
280         *error_info_p = EXI_TYPE;
281         return NULL;
282     }
283 
284     btid = erts_magic_ref2bin(tid);
285     if (ERTS_MAGIC_BIN_DESTRUCTOR(btid) != db_table_tid_destructor) {
286         *error_info_p = EXI_TYPE;
287         return NULL;
288     }
289 
290     tbref = erts_binary_to_magic_indirection(btid);
291     tb = (DbTable *) erts_atomic_read_nob(tbref);
292 
293     ASSERT(!tb || tb->common.btid == btid);
294 
295     if (tb == NULL) {
296 	*error_info_p = EXI_ID;
297     }
298 
299     return tb;
300 }
301 
302 static ERTS_INLINE int
is_table_alive(DbTable * tb)303 is_table_alive(DbTable *tb)
304 {
305     erts_atomic_t *tbref;
306     DbTable *rtb;
307 
308     tbref = erts_binary_to_magic_indirection(tb->common.btid);
309     rtb = (DbTable *) erts_atomic_read_nob(tbref);
310 
311     ASSERT(!rtb || rtb == tb);
312 
313     return !!rtb;
314 }
315 
316 static ERTS_INLINE int
is_table_named(DbTable * tb)317 is_table_named(DbTable *tb)
318 {
319     return tb->common.type & DB_NAMED_TABLE;
320 }
321 
322 
323 static ERTS_INLINE void
tid_clear(Process * c_p,DbTable * tb)324 tid_clear(Process *c_p, DbTable *tb)
325 {
326     DbTable *rtb;
327     Binary *btid = tb->common.btid;
328     erts_atomic_t *tbref = erts_binary_to_magic_indirection(btid);
329     rtb = (DbTable *) erts_atomic_xchg_nob(tbref, (erts_aint_t) NULL);
330     ASSERT(!rtb || tb == rtb);
331     if (rtb) {
332         table_dec_refc(tb, 1);
333         delete_sched_table(c_p, tb);
334     }
335 }
336 
337 static ERTS_INLINE Eterm
make_tid(Process * c_p,DbTable * tb)338 make_tid(Process *c_p, DbTable *tb)
339 {
340     Eterm *hp = HAlloc(c_p, ERTS_MAGIC_REF_THING_SIZE);
341     return erts_mk_magic_ref(&hp, &c_p->off_heap, tb->common.btid);
342 }
343 
344 Eterm
erts_db_make_tid(Process * c_p,DbTableCommon * tb)345 erts_db_make_tid(Process *c_p, DbTableCommon *tb)
346 {
347     return make_tid(c_p, (DbTable*)tb);
348 }
349 
350 
351 
352 /*
353 ** The meta hash table of all NAMED ets tables
354 */
355 #  define META_NAME_TAB_LOCK_CNT 256
356 union {
357     erts_rwmtx_t lck;
358     byte align[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(erts_rwmtx_t))];
359 }meta_name_tab_rwlocks[META_NAME_TAB_LOCK_CNT];
360 static struct meta_name_tab_entry {
361     union {
362 	Eterm name_atom;
363 	Eterm mcnt; /* Length of mvec in multiple tab entry */
364     }u;
365     union {
366 	DbTable *tb;
367 	struct meta_name_tab_entry* mvec;
368     }pu;
369 } *meta_name_tab;
370 
371 static unsigned meta_name_tab_mask;
372 
373 static ERTS_INLINE
meta_name_tab_bucket(Eterm name,erts_rwmtx_t ** lockp)374 struct meta_name_tab_entry* meta_name_tab_bucket(Eterm name,
375 						 erts_rwmtx_t** lockp)
376 {
377     unsigned bix = atom_val(name) & meta_name_tab_mask;
378     struct meta_name_tab_entry* bucket = &meta_name_tab[bix];
379     /* Only non-dirty schedulers are allowed to access the metatable
380        The smp 1 optimizations for ETS depend on that */
381     ASSERT(erts_get_scheduler_data() && !ERTS_SCHEDULER_IS_DIRTY(erts_get_scheduler_data()));
382     *lockp = &meta_name_tab_rwlocks[bix % META_NAME_TAB_LOCK_CNT].lck;
383     return bucket;
384 }
385 
386 
387 typedef enum {
388     LCK_READ=1,     /* read only access */
389     LCK_WRITE=2,    /* exclusive table write access */
390     LCK_WRITE_REC=3, /* record write access */
391     NOLCK_ACCESS=4 /* Used to access the table structure
392                       without acquiring the table lock */
393 } db_lock_kind_t;
394 
395 extern DbTableMethod db_hash;
396 extern DbTableMethod db_tree;
397 extern DbTableMethod db_catree;
398 
399 int user_requested_db_max_tabs;
400 int erts_ets_realloc_always_moves;
401 int erts_ets_always_compress;
402 static int db_max_tabs;
403 
404 /*
405 ** Forward decls, static functions
406 */
407 
408 static void fix_table_locked(Process* p, DbTable* tb);
409 static void unfix_table_locked(Process* p,  DbTable* tb, db_lock_kind_t* kind);
410 static void set_heir(Process* me, DbTable* tb, Eterm heir, UWord heir_data);
411 static void free_heir_data(DbTable*);
412 static SWord free_fixations_locked(Process* p, DbTable *tb);
413 
414 static void delete_all_objects_continue(Process* p, DbTable* tb);
415 static SWord free_table_continue(Process *p, DbTable *tb, SWord reds);
416 static void print_table(fmtfn_t to, void *to_arg, int show,  DbTable* tb);
417 static BIF_RETTYPE ets_select_delete_trap_1(BIF_ALIST_1);
418 static BIF_RETTYPE ets_select_count_1(BIF_ALIST_1);
419 static BIF_RETTYPE ets_select_replace_1(BIF_ALIST_1);
420 static BIF_RETTYPE ets_select_trap_1(BIF_ALIST_1);
421 static BIF_RETTYPE ets_delete_trap(BIF_ALIST_1);
422 static Eterm table_info(Process* p, DbTable* tb, Eterm What);
423 
424 static BIF_RETTYPE ets_select1(Process* p, int bif_ix, Eterm arg1);
425 static BIF_RETTYPE ets_select2(Process* p, DbTable*, Eterm tid, Eterm ms);
426 static BIF_RETTYPE ets_select3(Process* p, DbTable*, Eterm tid, Eterm ms, Sint chunk_size);
427 
428 
429 /*
430  * Exported global
431  */
432 Export ets_select_delete_continue_exp;
433 Export ets_select_count_continue_exp;
434 Export ets_select_replace_continue_exp;
435 Export ets_select_continue_exp;
436 /*
437  * Static traps
438  */
439 static Export ets_delete_continue_exp;
440 
441 static Export *ets_info_binary_trap = NULL;
442 
443 static void
free_dbtable(void * vtb)444 free_dbtable(void *vtb)
445 {
446     DbTable *tb = (DbTable *) vtb;
447     erts_flxctr_add(&tb->common.counters,
448                     ERTS_DB_TABLE_MEM_COUNTER_ID,
449                     -((Sint)erts_flxctr_nr_of_allocated_bytes(&tb->common.counters)));
450     ASSERT(erts_flxctr_is_snapshot_ongoing(&tb->common.counters) ||
451            sizeof(DbTable) == DB_GET_APPROX_MEM_CONSUMED(tb));
452 
453     ASSERT(is_immed(tb->common.heir_data));
454 
455     if (!DB_LOCK_FREE(tb)) {
456         erts_rwmtx_destroy(&tb->common.rwlock);
457         erts_mtx_destroy(&tb->common.fixlock);
458     }
459 
460     if (tb->common.btid)
461         erts_bin_release(tb->common.btid);
462 
463     erts_flxctr_destroy(&tb->common.counters, ERTS_ALC_T_ETS_CTRS);
464     erts_free(ERTS_ALC_T_DB_TABLE, tb);
465 }
466 
schedule_free_dbtable(DbTable * tb)467 static void schedule_free_dbtable(DbTable* tb)
468 {
469     /*
470      * NON-SMP case: Caller is *not* allowed to access the *tb
471      *               structure after this function has returned!
472      * SMP case:     Caller is allowed to access the *common* part of the *tb
473      *  	     structure until the bif has returned (we typically need to
474      *  	     unlock the table lock after this function has returned).
475      *  	     Caller is *not* allowed to access the specialized part
476      *  	     (hash or tree) of *tb after this function has returned.
477      */
478     ASSERT(erts_refc_read(&tb->common.refc, 0) == 0);
479     ASSERT(erts_refc_read(&tb->common.fix_count, 0) == 0);
480     erts_schedule_thr_prgr_later_cleanup_op(free_dbtable,
481 					    (void *) tb,
482 					    &tb->release.data,
483 					    sizeof(DbTable));
484 }
485 
486 static ERTS_INLINE void
save_sched_table(Process * c_p,DbTable * tb)487 save_sched_table(Process *c_p, DbTable *tb)
488 {
489     ErtsSchedulerData *esdp = erts_proc_sched_data(c_p);
490     DbTable *first;
491 
492     ASSERT(esdp);
493     erts_atomic_inc_nob(&esdp->ets_tables.count);
494     erts_refc_inc(&tb->common.refc, 1);
495 
496     first = esdp->ets_tables.clist;
497     if (!first) {
498         tb->common.all.next = tb->common.all.prev = tb;
499         esdp->ets_tables.clist = tb;
500     }
501     else {
502         tb->common.all.prev = first->common.all.prev;
503         tb->common.all.next = first;
504         tb->common.all.prev->common.all.next = tb;
505         first->common.all.prev = tb;
506     }
507 }
508 
509 static ERTS_INLINE void
remove_sched_table(ErtsSchedulerData * esdp,DbTable * tb)510 remove_sched_table(ErtsSchedulerData *esdp, DbTable *tb)
511 {
512     ErtsEtsAllYieldData *eaydp;
513     ASSERT(esdp);
514     ASSERT(erts_get_ref_numbers_thr_id(ERTS_MAGIC_BIN_REFN(tb->common.btid))
515            == (Uint32) esdp->no);
516 
517     ASSERT(erts_atomic_read_nob(&esdp->ets_tables.count) > 0);
518     erts_atomic_dec_nob(&esdp->ets_tables.count);
519 
520     eaydp = ERTS_SCHED_AUX_YIELD_DATA(esdp, ets_all);
521     if (eaydp->ongoing) {
522         /* ets:all() op process list from last to first... */
523         if (eaydp->tab == tb) {
524             if (eaydp->tab == esdp->ets_tables.clist)
525                 eaydp->tab = NULL;
526             else
527                 eaydp->tab = tb->common.all.prev;
528         }
529     }
530 
531     if (tb->common.all.next == tb) {
532         ASSERT(tb->common.all.prev == tb);
533         ASSERT(esdp->ets_tables.clist == tb);
534         esdp->ets_tables.clist = NULL;
535     }
536     else {
537 #ifdef DEBUG
538         DbTable *tmp = esdp->ets_tables.clist;
539         do {
540             if (tmp == tb) break;
541             tmp = tmp->common.all.next;
542         } while (tmp != esdp->ets_tables.clist);
543         ASSERT(tmp == tb);
544 #endif
545         tb->common.all.prev->common.all.next = tb->common.all.next;
546         tb->common.all.next->common.all.prev = tb->common.all.prev;
547 
548         if (esdp->ets_tables.clist == tb)
549             esdp->ets_tables.clist = tb->common.all.next;
550 
551     }
552 
553     table_dec_refc(tb, 0);
554 }
555 
556 static void
scheduled_remove_sched_table(void * vtb)557 scheduled_remove_sched_table(void *vtb)
558 {
559     remove_sched_table(erts_get_scheduler_data(), (DbTable *) vtb);
560 }
561 
562 static void
delete_sched_table(Process * c_p,DbTable * tb)563 delete_sched_table(Process *c_p, DbTable *tb)
564 {
565     ErtsSchedulerData *esdp = erts_proc_sched_data(c_p);
566     Uint32 sid;
567 
568     ASSERT(esdp);
569 
570     ASSERT(tb->common.btid);
571     sid = erts_get_ref_numbers_thr_id(ERTS_MAGIC_BIN_REFN(tb->common.btid));
572     ASSERT(1 <= sid && sid <= erts_no_schedulers);
573     if (sid == (Uint32) esdp->no)
574         remove_sched_table(esdp, tb);
575     else
576         erts_schedule_misc_aux_work((int) sid, scheduled_remove_sched_table, tb);
577 }
578 
579 static ERTS_INLINE void
save_owned_table(Process * c_p,DbTable * tb)580 save_owned_table(Process *c_p, DbTable *tb)
581 {
582     DbTable *first;
583 
584     erts_proc_lock(c_p, ERTS_PROC_LOCK_STATUS);
585 
586     first = (DbTable*) erts_psd_get(c_p, ERTS_PSD_ETS_OWNED_TABLES);
587 
588     erts_refc_inc(&tb->common.refc, 1);
589 
590     if (!first) {
591         tb->common.owned.next = tb->common.owned.prev = tb;
592         erts_psd_set(c_p, ERTS_PSD_ETS_OWNED_TABLES, tb);
593     }
594     else {
595         tb->common.owned.prev = first->common.owned.prev;
596         tb->common.owned.next = first;
597         tb->common.owned.prev->common.owned.next = tb;
598         first->common.owned.prev = tb;
599     }
600     erts_proc_unlock(c_p, ERTS_PROC_LOCK_STATUS);
601 }
602 
603 static ERTS_INLINE void
delete_owned_table(Process * p,DbTable * tb)604 delete_owned_table(Process *p, DbTable *tb)
605 {
606     erts_proc_lock(p, ERTS_PROC_LOCK_STATUS);
607     if (tb->common.owned.next == tb) {
608         DbTable* old;
609         ASSERT(tb->common.owned.prev == tb);
610         old = erts_psd_set(p, ERTS_PSD_ETS_OWNED_TABLES, NULL);
611         ASSERT(old == tb); (void)old;
612     }
613     else {
614         DbTable *first = (DbTable*) erts_psd_get(p, ERTS_PSD_ETS_OWNED_TABLES);
615 #ifdef DEBUG
616         DbTable *tmp = first;
617         do {
618             if (tmp == tb) break;
619             tmp = tmp->common.owned.next;
620         } while (tmp != first);
621         ASSERT(tmp == tb);
622 #endif
623         tb->common.owned.prev->common.owned.next = tb->common.owned.next;
624         tb->common.owned.next->common.owned.prev = tb->common.owned.prev;
625 
626         if (tb == first)
627             erts_psd_set(p, ERTS_PSD_ETS_OWNED_TABLES, tb->common.owned.next);
628     }
629     erts_proc_unlock(p, ERTS_PROC_LOCK_STATUS);
630 
631     table_dec_refc(tb, 1);
632 }
633 
db_init_lock(DbTable * tb,int use_frequent_read_lock)634 static ERTS_INLINE void db_init_lock(DbTable* tb, int use_frequent_read_lock)
635 {
636     erts_rwmtx_opt_t rwmtx_opt = ERTS_RWMTX_OPT_DEFAULT_INITER;
637     if (use_frequent_read_lock)
638         rwmtx_opt.type = ERTS_RWMTX_TYPE_FREQUENT_READ;
639     if (erts_ets_rwmtx_spin_count >= 0)
640         rwmtx_opt.main_spincount = erts_ets_rwmtx_spin_count;
641     if (!DB_LOCK_FREE(tb)) {
642         erts_rwmtx_init_opt(&tb->common.rwlock, &rwmtx_opt, "db_tab",
643                             tb->common.the_name, ERTS_LOCK_FLAGS_CATEGORY_DB);
644         erts_mtx_init(&tb->common.fixlock, "db_tab_fix",
645                       tb->common.the_name, ERTS_LOCK_FLAGS_CATEGORY_DB);
646     }
647     tb->common.is_thread_safe = !(tb->common.status & DB_FINE_LOCKED);
648     ASSERT(!DB_LOCK_FREE(tb) || tb->common.is_thread_safe);
649 }
650 
db_lock(DbTable * tb,db_lock_kind_t kind)651 static ERTS_INLINE void db_lock(DbTable* tb, db_lock_kind_t kind)
652 {
653     if (DB_LOCK_FREE(tb))
654         return;
655     if (tb->common.type & DB_FINE_LOCKED) {
656         if (kind == LCK_WRITE) {
657             erts_rwmtx_rwlock(&tb->common.rwlock);
658             tb->common.is_thread_safe = 1;
659         } else if (kind != NOLCK_ACCESS) {
660             erts_rwmtx_rlock(&tb->common.rwlock);
661             ASSERT(!tb->common.is_thread_safe);
662         }
663     }
664     else
665     {
666         switch (kind) {
667         case LCK_WRITE:
668         case LCK_WRITE_REC:
669             erts_rwmtx_rwlock(&tb->common.rwlock);
670             break;
671         case NOLCK_ACCESS:
672             return;
673         default:
674             erts_rwmtx_rlock(&tb->common.rwlock);
675         }
676         ASSERT(tb->common.is_thread_safe);
677     }
678 }
679 
db_unlock(DbTable * tb,db_lock_kind_t kind)680 static ERTS_INLINE void db_unlock(DbTable* tb, db_lock_kind_t kind)
681 {
682     if (DB_LOCK_FREE(tb) || kind == NOLCK_ACCESS)
683         return;
684     if (tb->common.type & DB_FINE_LOCKED) {
685         if (kind == LCK_WRITE) {
686             ASSERT(tb->common.is_thread_safe);
687             tb->common.is_thread_safe = 0;
688             erts_rwmtx_rwunlock(&tb->common.rwlock);
689         }
690         else {
691             ASSERT(!tb->common.is_thread_safe);
692             erts_rwmtx_runlock(&tb->common.rwlock);
693         }
694     }
695     else {
696         ASSERT(tb->common.is_thread_safe);
697         switch (kind) {
698         case LCK_WRITE:
699         case LCK_WRITE_REC:
700             erts_rwmtx_rwunlock(&tb->common.rwlock);
701             break;
702         default:
703             erts_rwmtx_runlock(&tb->common.rwlock);
704         }
705     }
706 }
707 
db_is_exclusive(DbTable * tb,db_lock_kind_t kind)708 static ERTS_INLINE int db_is_exclusive(DbTable* tb, db_lock_kind_t kind)
709 {
710     if (DB_LOCK_FREE(tb))
711         return 1;
712 
713     return
714         kind != LCK_READ &&
715         kind != NOLCK_ACCESS &&
716         tb->common.is_thread_safe;
717 }
718 
handle_lacking_permission(Process * p,DbTable * tb,db_lock_kind_t kind,Uint * freason_p)719 static DbTable* handle_lacking_permission(Process* p, DbTable* tb,
720                                           db_lock_kind_t kind,
721                                           Uint* freason_p)
722 {
723     if (tb->common.status & DB_BUSY) {
724         void* continuation_state;
725         if (!db_is_exclusive(tb, kind)) {
726             db_unlock(tb, kind);
727             db_lock(tb, LCK_WRITE);
728         }
729         continuation_state = (void*)erts_atomic_read_nob(&tb->common.continuation_state);
730         if (continuation_state != NULL) {
731             const long iterations_per_red = 10;
732             const long reds = iterations_per_red * ERTS_BIF_REDS_LEFT(p);
733             long nr_of_reductions = DBG_RANDOM_REDS(reds, (Uint)freason_p);
734             const long init_reds = nr_of_reductions;
735             tb->common.continuation(&nr_of_reductions,
736                                     &continuation_state,
737                                     NULL);
738             if (continuation_state == NULL) {
739                 erts_atomic_set_relb(&tb->common.continuation_state, (Sint)NULL);
740             }
741             BUMP_REDS(p, (init_reds - nr_of_reductions) / iterations_per_red);
742         } else {
743             delete_all_objects_continue(p, tb);
744         }
745         db_unlock(tb, LCK_WRITE);
746         tb = NULL;
747         *freason_p = TRAP;
748     }
749     else if (p->common.id != tb->common.owner
750              && !(p->flags & F_ETS_SUPER_USER)) {
751         db_unlock(tb, kind);
752         tb = NULL;
753         p->fvalue = EXI_ACCESS;
754         *freason_p = BADARG | EXF_HAS_EXT_INFO;
755     }
756     return tb;
757 }
758 
759 static ERTS_INLINE
db_get_table_aux(Process * p,Eterm id,int what,db_lock_kind_t kind,int meta_already_locked,Uint * freason_p)760 DbTable* db_get_table_aux(Process *p,
761 			  Eterm id,
762 			  int what,
763 			  db_lock_kind_t kind,
764 			  int meta_already_locked,
765                           Uint* freason_p)
766 {
767     DbTable *tb;
768 
769     /*
770      * IMPORTANT: Only non-dirty scheduler threads are allowed
771      *            to access tables. Memory management depend on it.
772      */
773     ASSERT(erts_get_scheduler_data() && !ERTS_SCHEDULER_IS_DIRTY(erts_get_scheduler_data()));
774 
775     ASSERT((what == DB_READ_TBL_STRUCT) == (kind == NOLCK_ACCESS));
776 
777     if (META_DB_LOCK_FREE())
778         meta_already_locked = 1;
779 
780     if (is_not_atom(id)) {
781         tb = tid2tab(id, &p->fvalue);
782     } else {
783         erts_rwmtx_t *mtl;
784 	struct meta_name_tab_entry* bucket = meta_name_tab_bucket(id,&mtl);
785 	if (!meta_already_locked)
786 	    erts_rwmtx_rlock(mtl);
787 	else {
788 	    ERTS_LC_ASSERT(META_DB_LOCK_FREE()
789                            || erts_lc_rwmtx_is_rlocked(mtl)
790                            || erts_lc_rwmtx_is_rwlocked(mtl));
791 	}
792         tb = NULL;
793 	if (bucket->pu.tb != NULL) {
794 	    if (is_atom(bucket->u.name_atom)) { /* single */
795 		if (bucket->u.name_atom == id)
796 		    tb = bucket->pu.tb;
797 	    }
798 	    else { /* multi */
799 		Uint cnt = unsigned_val(bucket->u.mcnt);
800 		Uint i;
801 		for (i=0; i<cnt; i++) {
802 		    if (bucket->pu.mvec[i].u.name_atom == id) {
803 			tb = bucket->pu.mvec[i].pu.tb;
804 			break;
805 		    }
806 		}
807 	    }
808 	}
809         if (!meta_already_locked)
810             erts_rwmtx_runlock(mtl);
811 
812 	if (tb == NULL) {
813             p->fvalue = EXI_ID;
814 	}
815     }
816 
817     if (tb) {
818 	db_lock(tb, kind);
819 #ifdef ETS_DBG_FORCE_TRAP
820         /*
821          * The ets_SUITE uses this to verify that all table lookups calls
822          * can handle a failed TRAP return correctly.
823          */
824         if (what != DB_READ_TBL_STRUCT && tb->common.dbg_force_trap) {
825             if (!(p->flags & F_DBG_FORCED_TRAP)) {
826                 db_unlock(tb, kind);
827                 tb = NULL;
828                 *freason_p = TRAP;
829                 p->fvalue = EXI_TYPE;
830                 p->flags |= F_DBG_FORCED_TRAP;
831                 return tb;
832             } else {
833                 /* back from forced trap */
834                 p->flags &= ~F_DBG_FORCED_TRAP;
835             }
836         }
837 #endif
838         if (what != DB_READ_TBL_STRUCT
839             /* IMPORTANT: the above check is necessary as the status field
840                           might be in an intermediate state when
841                           kind==NOLCK_ACCESS */
842                 && ERTS_UNLIKELY(!(tb->common.status & what))) {
843             tb = handle_lacking_permission(p, tb, kind, freason_p);
844         }
845     }
846     else {
847         *freason_p = BADARG | EXF_HAS_EXT_INFO;
848     }
849 
850     return tb;
851 }
852 
853 static ERTS_INLINE
db_get_table(Process * p,Eterm id,int what,db_lock_kind_t kind,Uint * freason_p)854 DbTable* db_get_table(Process *p,
855 		      Eterm id,
856 		      int what,
857 		      db_lock_kind_t kind,
858                       Uint* freason_p)
859 {
860     return db_get_table_aux(p, id, what, kind, 0, freason_p);
861 }
862 
db_get_table_or_fail_return(DbTable ** tb,Eterm table_id,Uint32 what,db_lock_kind_t kind,Uint bif_ix,Process * p)863 static BIF_RETTYPE db_get_table_or_fail_return(DbTable **tb, /* out */
864                                                Eterm table_id,
865                                                Uint32 what,
866                                                db_lock_kind_t kind,
867                                                Uint bif_ix,
868                                                Process* p)
869 {
870     DB_GET_TABLE(*tb, table_id, what, kind, bif_ix, NULL, p);
871     return THE_NON_VALUE;
872 }
873 
insert_named_tab(Eterm name_atom,DbTable * tb,int have_lock)874 static int insert_named_tab(Eterm name_atom, DbTable* tb, int have_lock)
875 {
876     int ret = 0;
877     erts_rwmtx_t* rwlock;
878     struct meta_name_tab_entry* new_entry;
879     struct meta_name_tab_entry* bucket = meta_name_tab_bucket(name_atom,
880 							      &rwlock);
881 
882     if (META_DB_LOCK_FREE())
883         have_lock = 1;
884 
885     if (!have_lock)
886 	erts_rwmtx_rwlock(rwlock);
887 
888     if (bucket->pu.tb == NULL) { /* empty */
889 	new_entry = bucket;
890     }
891     else {
892 	struct meta_name_tab_entry* entries;
893 	Uint cnt;
894 	if (is_atom(bucket->u.name_atom)) { /* single */
895 	    size_t size;
896 	    if (bucket->u.name_atom == name_atom) {
897 		goto done;
898 	    }
899 	    cnt = 2;
900 	    size = sizeof(struct meta_name_tab_entry)*cnt;
901 	    entries = erts_db_alloc_nt(ERTS_ALC_T_DB_NTAB_ENT, size);
902 	    ERTS_ETS_MISC_MEM_ADD(size);
903 	    new_entry = &entries[0];
904 	    entries[1] = *bucket;
905 	}
906 	else { /* multi */
907 	    size_t size, old_size;
908 	    Uint i;
909 	    cnt = unsigned_val(bucket->u.mcnt);
910 	    for (i=0; i<cnt; i++) {
911 		if (bucket->pu.mvec[i].u.name_atom == name_atom) {
912 		    goto done;
913 		}
914 	    }
915 	    old_size = sizeof(struct meta_name_tab_entry)*cnt;
916 	    size = sizeof(struct meta_name_tab_entry)*(cnt+1);
917 	    entries = erts_db_realloc_nt(ERTS_ALC_T_DB_NTAB_ENT,
918 					 bucket->pu.mvec,
919 					 old_size,
920 					 size);
921 	    ERTS_ETS_MISC_MEM_ADD(size-old_size);
922 	    new_entry = &entries[cnt];
923 	    cnt++;
924 	}
925 	bucket->pu.mvec = entries;
926 	bucket->u.mcnt = make_small(cnt);
927     }
928     new_entry->pu.tb = tb;
929     new_entry->u.name_atom = name_atom;
930     ret = 1; /* Ok */
931 
932 done:
933     if (!have_lock)
934 	erts_rwmtx_rwunlock(rwlock);
935     return ret;
936 }
937 
remove_named_tab(DbTable * tb,int have_lock)938 static int remove_named_tab(DbTable *tb, int have_lock)
939 {
940     int ret = 0;
941     erts_rwmtx_t* rwlock;
942     Eterm name_atom = tb->common.the_name;
943     struct meta_name_tab_entry* bucket = meta_name_tab_bucket(name_atom,
944 							      &rwlock);
945     ASSERT(is_table_named(tb));
946 
947     if (META_DB_LOCK_FREE())
948         have_lock = 1;
949 
950     if (!have_lock && erts_rwmtx_tryrwlock(rwlock) == EBUSY) {
951 	db_unlock(tb, LCK_WRITE);
952 	erts_rwmtx_rwlock(rwlock);
953 	db_lock(tb, LCK_WRITE);
954     }
955 
956     ERTS_LC_ASSERT(META_DB_LOCK_FREE() || erts_lc_rwmtx_is_rwlocked(rwlock));
957 
958     if (bucket->pu.tb == NULL) {
959 	goto done;
960     }
961     else if (is_atom(bucket->u.name_atom)) { /* single */
962 	if (bucket->u.name_atom != name_atom) {
963 	    goto done;
964 	}
965 	bucket->pu.tb = NULL;
966     }
967     else { /* multi */
968 	Uint cnt = unsigned_val(bucket->u.mcnt);
969 	Uint i = 0;
970 	for (;;) {
971 	    if (bucket->pu.mvec[i].u.name_atom == name_atom) {
972 		break;
973 	    }
974 	    if (++i >= cnt) {
975 		goto done;
976 	    }
977 	}
978 	if (cnt == 2) { /* multi -> single */
979 	    size_t size;
980 	    struct meta_name_tab_entry* entries = bucket->pu.mvec;
981 	    *bucket = entries[1-i];
982 	    size = sizeof(struct meta_name_tab_entry)*cnt;
983 	    erts_db_free_nt(ERTS_ALC_T_DB_NTAB_ENT, entries, size);
984 	    ERTS_ETS_MISC_MEM_ADD(-size);
985 	    ASSERT(is_atom(bucket->u.name_atom));
986 	}
987 	else {
988 	    size_t size, old_size;
989 	    ASSERT(cnt > 2);
990 	    bucket->u.mcnt = make_small(--cnt);
991 	    if (i != cnt) {
992 		/* reposition last one before realloc destroys it */
993 		bucket->pu.mvec[i] = bucket->pu.mvec[cnt];
994 	    }
995 	    old_size = sizeof(struct meta_name_tab_entry)*(cnt+1);
996 	    size = sizeof(struct meta_name_tab_entry)*cnt;
997 	    bucket->pu.mvec = erts_db_realloc_nt(ERTS_ALC_T_DB_NTAB_ENT,
998 						 bucket->pu.mvec,
999 						 old_size,
1000 						 size);
1001 	    ERTS_ETS_MISC_MEM_ADD(size - old_size);
1002 
1003 	}
1004     }
1005     ret = 1; /* Ok */
1006 
1007 done:
1008     if (!have_lock)
1009 	erts_rwmtx_rwunlock(rwlock);
1010     return ret;
1011 }
1012 
1013 /* Do a fast fixation of a hash table.
1014 ** Must be matched by a local unfix before releasing table lock.
1015 */
local_fix_table(DbTable * tb)1016 static ERTS_INLINE void local_fix_table(DbTable* tb)
1017 {
1018     erts_refc_inc(&tb->common.fix_count, 1);
1019 }
local_unfix_table(DbTable * tb)1020 static ERTS_INLINE void local_unfix_table(DbTable* tb)
1021 {
1022     if (erts_refc_dectest(&tb->common.fix_count, 0) == 0) {
1023 	ASSERT(IS_HASH_TABLE(tb->common.status));
1024 	db_unfix_table_hash(&(tb->hash));
1025     }
1026 }
1027 
1028 
1029 /*
1030  * BIFs.
1031  */
1032 
ets_safe_fixtable_2(BIF_ALIST_2)1033 BIF_RETTYPE ets_safe_fixtable_2(BIF_ALIST_2)
1034 {
1035     DbTable *tb;
1036     db_lock_kind_t kind;
1037 #ifdef HARDDEBUG
1038     erts_fprintf(stderr,
1039 		"ets:safe_fixtable(%T,%T); Process: %T, initial: %T:%T/%bpu\n",
1040 		BIF_ARG_1, BIF_ARG_2, BIF_P->common.id,
1041 		BIF_P->u.initial[0], BIF_P->u.initial[1], BIF_P->u.initial[2]);
1042 #endif
1043     kind = (BIF_ARG_2 == am_true) ? LCK_READ : LCK_WRITE_REC;
1044 
1045     DB_BIF_GET_TABLE(tb, DB_READ, kind, BIF_ets_safe_fixtable_2);
1046 
1047     if (BIF_ARG_2 == am_true) {
1048 	fix_table_locked(BIF_P, tb);
1049     }
1050     else if (BIF_ARG_2 == am_false) {
1051 	if (IS_FIXED(tb)) {
1052 	    unfix_table_locked(BIF_P, tb, &kind);
1053 	}
1054     }
1055     else {
1056 	db_unlock(tb, kind);
1057 	BIF_ERROR(BIF_P, BADARG);
1058     }
1059     db_unlock(tb, kind);
1060     BIF_RET(am_true);
1061 }
1062 
1063 
1064 /*
1065 ** Returns the first Key in a table
1066 */
ets_first_1(BIF_ALIST_1)1067 BIF_RETTYPE ets_first_1(BIF_ALIST_1)
1068 {
1069     DbTable* tb;
1070     int cret;
1071     Eterm ret;
1072 
1073     CHECK_TABLES();
1074 
1075     DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_first_1);
1076 
1077     cret = tb->common.meth->db_first(BIF_P, tb, &ret);
1078 
1079     db_unlock(tb, LCK_READ);
1080 
1081     if (cret != DB_ERROR_NONE) {
1082 	BIF_ERROR(BIF_P, BADARG);
1083     }
1084     BIF_RET(ret);
1085 }
1086 
1087 /*
1088 ** The next BIF, given a key, return the "next" key
1089 */
ets_next_2(BIF_ALIST_2)1090 BIF_RETTYPE ets_next_2(BIF_ALIST_2)
1091 {
1092     DbTable* tb;
1093     int cret;
1094     Eterm ret;
1095 
1096     CHECK_TABLES();
1097 
1098     DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_next_2);
1099 
1100     cret = tb->common.meth->db_next(BIF_P, tb, BIF_ARG_2, &ret);
1101 
1102     db_unlock(tb, LCK_READ);
1103 
1104     if (cret != DB_ERROR_NONE) {
1105 	BIF_ERROR(BIF_P, BADARG);
1106     }
1107     BIF_RET(ret);
1108 }
1109 
1110 /*
1111 ** Returns the last Key in a table
1112 */
ets_last_1(BIF_ALIST_1)1113 BIF_RETTYPE ets_last_1(BIF_ALIST_1)
1114 {
1115     DbTable* tb;
1116     int cret;
1117     Eterm ret;
1118 
1119     CHECK_TABLES();
1120 
1121     DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_last_1);
1122 
1123     cret = tb->common.meth->db_last(BIF_P, tb, &ret);
1124 
1125     db_unlock(tb, LCK_READ);
1126 
1127     if (cret != DB_ERROR_NONE) {
1128 	BIF_ERROR(BIF_P, BADARG);
1129     }
1130     BIF_RET(ret);
1131 }
1132 
1133 /*
1134 ** The prev BIF, given a key, return the "previous" key
1135 */
ets_prev_2(BIF_ALIST_2)1136 BIF_RETTYPE ets_prev_2(BIF_ALIST_2)
1137 {
1138     DbTable* tb;
1139     int cret;
1140     Eterm ret;
1141 
1142     CHECK_TABLES();
1143 
1144     DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_prev_2);
1145 
1146     cret = tb->common.meth->db_prev(BIF_P,tb,BIF_ARG_2,&ret);
1147 
1148     db_unlock(tb, LCK_READ);
1149 
1150     if (cret != DB_ERROR_NONE) {
1151 	BIF_ERROR(BIF_P, BADARG);
1152     }
1153     BIF_RET(ret);
1154 }
1155 
1156 /*
1157 ** take(Tab, Key)
1158 */
ets_take_2(BIF_ALIST_2)1159 BIF_RETTYPE ets_take_2(BIF_ALIST_2)
1160 {
1161     DbTable* tb;
1162     int cret;
1163     Eterm ret;
1164     CHECK_TABLES();
1165 
1166     DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE_REC, BIF_ets_take_2);
1167 
1168     cret = tb->common.meth->db_take(BIF_P, tb, BIF_ARG_2, &ret);
1169 
1170     ASSERT(cret == DB_ERROR_NONE); (void)cret;
1171     db_unlock(tb, LCK_WRITE_REC);
1172     BIF_RET(ret);
1173 }
1174 
1175 /*
1176 ** update_element(Tab, Key, {Pos, Value})
1177 ** update_element(Tab, Key, [{Pos, Value}])
1178 */
ets_update_element_3(BIF_ALIST_3)1179 BIF_RETTYPE ets_update_element_3(BIF_ALIST_3)
1180 {
1181     DbTable* tb;
1182     int cret = DB_ERROR_BADITEM;
1183     Eterm list;
1184     Eterm iter;
1185     DeclareTmpHeap(cell,2,BIF_P);
1186     DbUpdateHandle handle;
1187 
1188     DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE_REC, BIF_ets_update_element_3);
1189 
1190     UseTmpHeap(2,BIF_P);
1191     if (!(tb->common.status & (DB_SET | DB_ORDERED_SET | DB_CA_ORDERED_SET))) {
1192 	BIF_P->fvalue = EXI_TAB_TYPE;
1193 	cret = DB_ERROR_BADPARAM;
1194 	goto bail_out;
1195     }
1196     if (is_tuple(BIF_ARG_3)) {
1197 	list = CONS(cell, BIF_ARG_3, NIL);
1198     }
1199     else {
1200 	list = BIF_ARG_3;
1201     }
1202 
1203     if (!tb->common.meth->db_lookup_dbterm(BIF_P, tb, BIF_ARG_2, THE_NON_VALUE, &handle)) {
1204 	cret = DB_ERROR_BADKEY;
1205 	goto bail_out;
1206     }
1207 
1208     /* First verify that list is ok to avoid nasty rollback scenarios
1209     */
1210     for (iter=list ; is_not_nil(iter); iter = CDR(list_val(iter))) {
1211 	Eterm pv;
1212 	Eterm* pvp;
1213 	Sint position;
1214 
1215 	if (is_not_list(iter)) {
1216 	    goto finalize;
1217 	}
1218 	pv = CAR(list_val(iter));    /* {Pos,Value} */
1219 	if (is_not_tuple(pv)) {
1220 	    goto finalize;
1221 	}
1222 	pvp = tuple_val(pv);
1223 	if (arityval(*pvp) != 2 || !is_small(pvp[1])) {
1224 	    goto finalize;
1225 	}
1226 	position = signed_val(pvp[1]);
1227 	if (position == tb->common.keypos) {
1228             BIF_P->fvalue = EXI_KEY_POS;
1229             cret = DB_ERROR_UNSPEC;
1230             goto finalize;
1231 	}
1232 	if (position < 1 || position == tb->common.keypos ||
1233 	    position > arityval(handle.dbterm->tpl[0])) {
1234 	    goto finalize;
1235         }
1236     }
1237     /* The point of no return, no failures from here on.
1238     */
1239     cret = DB_ERROR_NONE;
1240 
1241     for (iter=list ; is_not_nil(iter); iter = CDR(list_val(iter))) {
1242 	Eterm* pvp = tuple_val(CAR(list_val(iter)));    /* {Pos,Value} */
1243 	db_do_update_element(&handle, signed_val(pvp[1]), pvp[2]);
1244     }
1245 
1246 finalize:
1247     tb->common.meth->db_finalize_dbterm(cret, &handle);
1248 
1249 bail_out:
1250     UnUseTmpHeap(2,BIF_P);
1251     db_unlock(tb, LCK_WRITE_REC);
1252 
1253     switch (cret) {
1254     case DB_ERROR_NONE:
1255 	BIF_RET(am_true);
1256     case DB_ERROR_BADKEY:
1257 	BIF_RET(am_false);
1258     case DB_ERROR_SYSRES:
1259 	BIF_ERROR(BIF_P, SYSTEM_LIMIT);
1260     case DB_ERROR_UNSPEC:
1261         BIF_ERROR(BIF_P, BADARG | EXF_HAS_EXT_INFO);
1262     default:
1263 	BIF_ERROR(BIF_P, BADARG);
1264 	break;
1265     }
1266 }
1267 
1268 static BIF_RETTYPE
do_update_counter(Process * p,DbTable * tb,Eterm arg2,Eterm arg3,Eterm arg4)1269 do_update_counter(Process *p, DbTable* tb,
1270                   Eterm arg2, Eterm arg3, Eterm arg4)
1271 {
1272     int cret = DB_ERROR_BADITEM;
1273     Eterm upop_list;
1274     int list_size;
1275     Eterm ret;  /* int or [int] */
1276     Eterm* ret_list_currp = NULL;
1277     Eterm* ret_list_prevp = NULL;
1278     Eterm iter;
1279     DeclareTmpHeap(cell, 5, p);
1280     Eterm *tuple = cell+2;
1281     DbUpdateHandle handle;
1282     Uint halloc_size = 0; /* overestimated heap usage */
1283     Eterm* htop;          /* actual heap usage */
1284     Eterm* hstart;
1285     Eterm* hend;
1286 
1287     UseTmpHeap(5, p);
1288     if (!(tb->common.status & (DB_SET | DB_ORDERED_SET | DB_CA_ORDERED_SET))) {
1289         p->fvalue = EXI_TAB_TYPE;
1290         cret = DB_ERROR_BADPARAM;
1291 	goto bail_out;
1292     }
1293     if (is_integer(arg3)) { /* Incr */
1294         upop_list = CONS(cell,
1295                          TUPLE2(tuple, make_small(tb->common.keypos+1), arg3),
1296                          NIL);
1297     }
1298     else if (is_tuple(arg3)) { /* {Upop} */
1299         upop_list = CONS(cell, arg3, NIL);
1300     }
1301     else { /* [{Upop}] (probably) */
1302         upop_list = arg3;
1303 	ret_list_prevp = &ret;
1304     }
1305 
1306     if (!tb->common.meth->db_lookup_dbterm(p, tb, arg2, arg4, &handle)) {
1307 	p->fvalue = EXI_BAD_KEY;
1308 	cret = DB_ERROR_BADPARAM;
1309 	goto bail_out; /* key not found */
1310     }
1311 
1312     /* First verify that list is ok to avoid nasty rollback scenarios
1313     */
1314     list_size = 0;
1315     for (iter=upop_list ; is_not_nil(iter); iter = CDR(list_val(iter)),
1316 	                                    list_size += 2) {
1317 	Eterm upop;
1318 	Eterm* tpl;
1319 	Sint position;
1320 	Eterm incr, warp;
1321 	Wterm oldcnt;
1322 
1323 	if (is_not_list(iter)) {
1324 	    goto finalize;
1325 	}
1326 	upop = CAR(list_val(iter));
1327 	if (is_not_tuple(upop)) {
1328 	    goto finalize;
1329 	}
1330 	tpl = tuple_val(upop);
1331 	switch (arityval(*tpl)) {
1332 	case 4: /* threshold specified */
1333 	    if (is_not_integer(tpl[3])) {
1334 		goto finalize;
1335 	    }
1336 	    warp = tpl[4];
1337 	    if (is_big(warp)) {
1338 		halloc_size += BIG_NEED_SIZE(big_arity(warp));
1339 	    }
1340 	    else if (is_not_small(warp)) {
1341 		goto finalize;
1342 	    }
1343 	    /* Fall through */
1344 	case 2:
1345 	    if (!is_small(tpl[1])) {
1346 		goto finalize;
1347 	    }
1348 	    incr = tpl[2];
1349 	    if (is_big(incr)) {
1350 		halloc_size += BIG_NEED_SIZE(big_arity(incr));
1351 	    }
1352 	    else if (is_not_small(incr)) {
1353 		goto finalize;
1354 	    }
1355 	    position = signed_val(tpl[1]);
1356 	    if (position == tb->common.keypos) {
1357                 p->fvalue = EXI_KEY_POS;
1358                 cret = DB_ERROR_BADPARAM;
1359                 goto finalize;
1360             }
1361             else if (position < 1 || position > arityval(handle.dbterm->tpl[0])) {
1362                 p->fvalue = EXI_POSITION;
1363                 cret = DB_ERROR_BADPARAM;
1364 		goto finalize;
1365 	    }
1366 	    oldcnt = db_do_read_element(&handle, position);
1367 	    if (is_big(oldcnt)) {
1368 		halloc_size += BIG_NEED_SIZE(big_arity(oldcnt));
1369 	    }
1370 	    else if (is_not_small(oldcnt)) {
1371 		goto finalize;
1372 	    }
1373 	    break;
1374 	default:
1375 	    goto finalize;
1376 	}
1377 	halloc_size += 2;  /* worst growth case: small(0)+small(0)=big(2) */
1378     }
1379 
1380     /* The point of no return, no failures from here on.
1381     */
1382     cret = DB_ERROR_NONE;
1383 
1384     if (ret_list_prevp) { /* Prepare to return a list */
1385 	ret = NIL;
1386 	halloc_size += list_size;
1387 	hstart = HAlloc(p, halloc_size);
1388 	ret_list_currp = hstart;
1389 	htop = hstart + list_size;
1390 	hend = hstart + halloc_size;
1391     }
1392     else {
1393 #ifdef DEBUG
1394         ret = THE_NON_VALUE;
1395 #endif
1396 	hstart = htop = HAlloc(p, halloc_size);
1397     }
1398     hend = hstart + halloc_size;
1399 
1400     for (iter=upop_list ; is_not_nil(iter); iter = CDR(list_val(iter))) {
1401 
1402 	Eterm* tpl = tuple_val(CAR(list_val(iter)));
1403 	Sint position = signed_val(tpl[1]);
1404 	Eterm incr = tpl[2];
1405 	Wterm oldcnt = db_do_read_element(&handle,position);
1406 	Eterm newcnt = db_add_counter(&htop, oldcnt, incr);
1407 
1408 	if (newcnt == NIL) {
1409 	    cret = DB_ERROR_SYSRES; /* Can only happen if BIG_ARITY_MAX */
1410 	    ret = NIL;              /* is reached, ie should not happen */
1411 	    htop = hstart;
1412 	    break;
1413 	}
1414 	ASSERT(is_integer(newcnt));
1415 
1416 	if (arityval(*tpl) == 4) { /* Maybe warp it */
1417 	    Eterm threshold = tpl[3];
1418 	    if ((CMP(incr,make_small(0)) < 0) ? /* negative increment? */
1419 		(CMP(newcnt,threshold) < 0) :  /* if negative, check if below */
1420 		(CMP(newcnt,threshold) > 0)) { /* else check if above threshold */
1421 
1422 		newcnt = tpl[4];
1423 	    }
1424 	}
1425 
1426 	db_do_update_element(&handle,position,newcnt);
1427 
1428 	if (ret_list_prevp) {
1429 	    *ret_list_prevp = CONS(ret_list_currp,newcnt,NIL);
1430 	    ret_list_prevp = &CDR(ret_list_currp);
1431 	    ret_list_currp += 2;
1432 	}
1433 	else {
1434 	    ret = newcnt;
1435 	    break;
1436 	}
1437     }
1438 
1439     ASSERT(is_integer(ret) || is_nil(ret) ||
1440 	   (is_list(ret) && (list_val(ret)+list_size)==ret_list_currp));
1441     ASSERT(htop <= hend);
1442 
1443     HRelease(p, hend, htop);
1444 
1445 finalize:
1446     tb->common.meth->db_finalize_dbterm(cret, &handle);
1447 
1448 bail_out:
1449     UnUseTmpHeap(5, p);
1450     db_unlock(tb, LCK_WRITE_REC);
1451 
1452     switch (cret) {
1453     case DB_ERROR_NONE:
1454 	BIF_RET(ret);
1455     case DB_ERROR_SYSRES:
1456         BIF_ERROR(p, SYSTEM_LIMIT);
1457     case DB_ERROR_BADPARAM:
1458         BIF_ERROR(p, BADARG | EXF_HAS_EXT_INFO);
1459     default:
1460         BIF_ERROR(p, BADARG);
1461 	break;
1462     }
1463 }
1464 
1465 /*
1466 ** update_counter(Tab, Key, Incr)
1467 ** update_counter(Tab, Key, Upop)
1468 ** update_counter(Tab, Key, [{Upop}])
1469 ** Upop = {Pos,Incr} | {Pos,Incr,Threshold,WarpTo}
1470 ** Returns new value(s) (integer or [integer])
1471 */
ets_update_counter_3(BIF_ALIST_3)1472 BIF_RETTYPE ets_update_counter_3(BIF_ALIST_3)
1473 {
1474     DbTable* tb;
1475 
1476     DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE_REC, BIF_ets_update_counter_3);
1477 
1478     return do_update_counter(BIF_P, tb, BIF_ARG_2, BIF_ARG_3, THE_NON_VALUE);
1479 }
1480 
1481 /*
1482 ** update_counter(Tab, Key, Incr, Default)
1483 ** update_counter(Tab, Key, Upop, Default)
1484 ** update_counter(Tab, Key, [{Upop}], Default)
1485 ** Upop = {Pos,Incr} | {Pos,Incr,Threshold,WarpTo}
1486 ** Returns new value(s) (integer or [integer])
1487 */
ets_update_counter_4(BIF_ALIST_4)1488 BIF_RETTYPE ets_update_counter_4(BIF_ALIST_4)
1489 {
1490     DbTable* tb;
1491 
1492     DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE_REC, BIF_ets_update_counter_4);
1493 
1494     if (is_not_tuple(BIF_ARG_4)) {
1495         db_unlock(tb, LCK_WRITE_REC);
1496         BIF_ERROR(BIF_P, BADARG);
1497     }
1498 
1499     return do_update_counter(BIF_P, tb, BIF_ARG_2, BIF_ARG_3, BIF_ARG_4);
1500 }
1501 
1502 typedef enum {
1503     ETS_INSERT_2_LIST_PROCESS_LOCAL,
1504     ETS_INSERT_2_LIST_FAILED_TO_GET_LOCK,
1505     ETS_INSERT_2_LIST_FAILED_TO_GET_LOCK_DESTROY,
1506     ETS_INSERT_2_LIST_GLOBAL
1507 } ets_insert_2_list_status;
1508 
1509 typedef struct {
1510     ets_insert_2_list_status status;
1511     BIF_RETTYPE destroy_return_value;
1512     DbTable* tb;
1513     void* continuation_state;
1514     Binary* continuation_res_bin;
1515 } ets_insert_2_list_info;
1516 
1517 
1518 static ERTS_INLINE BIF_RETTYPE
ets_cret_to_return_value(Process * p,int cret)1519 ets_cret_to_return_value(Process* p, int cret)
1520 {
1521     ASSERT(p || cret == DB_ERROR_NONE_FALSE || cret == DB_ERROR_NONE);
1522     switch (cret) {
1523     case DB_ERROR_NONE_FALSE:
1524         BIF_RET(am_false);
1525     case DB_ERROR_NONE:
1526 	BIF_RET(am_true);
1527     case DB_ERROR_SYSRES:
1528 	BIF_ERROR(p, SYSTEM_LIMIT);
1529     default:
1530 	BIF_ERROR(p, BADARG);
1531     }
1532 }
1533 
1534 /*
1535  * > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
1536  * > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
1537  *
1538  * Start of code section that Yielding C Fun (YCF) transforms
1539  *
1540  * The functions within #idef YCF_FUNCTIONS below are not called directly.
1541  * YCF generates yieldable versions of these functions before "erl_db.c" is
1542  * compiled. These generated functions are placed in the file
1543  * "erl_db_insert_list.ycf.h" which is included below. The generation of
1544  * "erl_db_insert_list.ycf.h" is defined in
1545  * "$ERL_TOP/erts/emulator/Makefile.in". See
1546  * "$ERL_TOP/erts/emulator/internal_doc/AutomaticYieldingOfCCode.md"
1547  * for more information about YCF.
1548  *
1549  * > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
1550  * > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
1551  */
1552 
1553 /*
1554  * The LOCAL_VARIABLE macro is a trick to create a local variable that does not
1555  * get renamed by YCF.
1556  * Such variables will not retain their values over yields. Beware!
1557  *
1558  * I use this as a workaround for a limitation/bug in YCF. It does not do
1559  * proper variable name substitution in expressions passed as argument to
1560  * YCF_CONSUME_REDS(Expr).
1561  */
1562 #define LOCAL_VARIABLE(TYPE, NAME) TYPE NAME
1563 
1564 #ifdef YCF_FUNCTIONS
ets_insert_2_list_check(int keypos,Eterm list)1565 static long ets_insert_2_list_check(int keypos, Eterm list)
1566 {
1567     Eterm lst = THE_NON_VALUE;
1568     long i = 0;
1569     for (lst = list; is_list(lst); lst = CDR(list_val(lst))) {
1570         i++;
1571         if (is_not_tuple(CAR(list_val(lst))) ||
1572             (arityval(*tuple_val(CAR(list_val(lst)))) < keypos)) {
1573             return -1;
1574         }
1575     }
1576     if (lst != NIL) {
1577         return -1;
1578     }
1579     return i;
1580 }
1581 
ets_insert_new_2_list_has_member(DbTable * tb,Eterm list)1582 static int ets_insert_new_2_list_has_member(DbTable* tb, Eterm list)
1583 {
1584     Eterm lst;
1585     Eterm lookup_ret;
1586     DbTableMethod* meth = tb->common.meth;
1587     for (lst = list; is_list(lst); lst = CDR(list_val(lst))) {
1588         meth->db_member(tb,
1589                         TERM_GETKEY(tb,CAR(list_val(lst))),
1590                         &lookup_ret);
1591         if (lookup_ret != am_false) {
1592             return 1;
1593         }
1594     }
1595     return 0;
1596 }
1597 
ets_insert_2_list_from_p_heap(DbTable * tb,Eterm list)1598 static int ets_insert_2_list_from_p_heap(DbTable* tb, Eterm list)
1599 {
1600     Eterm lst;
1601     DbTableMethod* meth = tb->common.meth;
1602     int cret = DB_ERROR_NONE;
1603     for (lst = list; is_list(lst); lst = CDR(list_val(lst))) {
1604         LOCAL_VARIABLE(SWord, consumed_reds);
1605         consumed_reds = 1;
1606         cret = meth->db_put(tb, CAR(list_val(lst)), 0, &consumed_reds);
1607         if (cret != DB_ERROR_NONE)
1608             return cret;
1609         YCF_CONSUME_REDS(consumed_reds);
1610     }
1611     return DB_ERROR_NONE;
1612 }
1613 #endif /* YCF_FUNCTIONS */
1614 
1615 /* This function is called both as is, and as YCF transformed. */
ets_insert_2_list_destroy_copied_dbterms(DbTableMethod * meth,int compressed,void * db_term_list)1616 static void ets_insert_2_list_destroy_copied_dbterms(DbTableMethod* meth,
1617                                                      int compressed,
1618                                                      void* db_term_list)
1619 {
1620     void* lst = db_term_list;
1621     void* term = NULL;
1622     while (lst != NULL) {
1623         term = meth->db_dbterm_list_remove_first(&lst);
1624         meth->db_free_dbterm(compressed, term);
1625     }
1626 }
1627 
1628 #ifdef YCF_FUNCTIONS
ets_insert_2_list_copy_term_list(DbTableMethod * meth,int compress,int keypos,Eterm list)1629 static void* ets_insert_2_list_copy_term_list(DbTableMethod* meth,
1630                                               int compress,
1631                                               int keypos,
1632                                               Eterm list)
1633 {
1634     void* db_term_list = NULL;
1635     void *term;
1636     Eterm lst;
1637     for (lst = list; is_list(lst); lst = CDR(list_val(lst))) {
1638         term = meth->db_eterm_to_dbterm(compress,
1639                                         keypos,
1640                                         CAR(list_val(lst)));
1641         if (db_term_list != NULL) {
1642             db_term_list =
1643                 meth->db_dbterm_list_prepend(db_term_list,
1644                                              term);
1645         } else {
1646             db_term_list = term;
1647         }
1648     }
1649 
1650     return db_term_list;
1651 
1652     /* The following code will be executed if the calling process is
1653        killed in the middle of the for loop above*/
1654     YCF_SPECIAL_CODE_START(ON_DESTROY_STATE); {
1655         ets_insert_2_list_destroy_copied_dbterms(meth,
1656                                                  compress,
1657                                                  db_term_list);
1658     } YCF_SPECIAL_CODE_END();
1659 }
1660 
ets_insert_new_2_dbterm_list_has_member(DbTable * tb,void * db_term_list)1661 static int ets_insert_new_2_dbterm_list_has_member(DbTable* tb, void* db_term_list)
1662 {
1663     Eterm lookup_ret;
1664     DbTableMethod* meth = tb->common.meth;
1665     void* lst = db_term_list;
1666     void* term = NULL;
1667     Eterm key;
1668     while (lst != NULL) {
1669         term = meth->db_dbterm_list_remove_first(&lst);
1670         key = meth->db_get_dbterm_key(tb, term);
1671         meth->db_member(tb, key, &lookup_ret);
1672         if (lookup_ret != am_false) {
1673             return 1;
1674         }
1675     }
1676     return 0;
1677 }
1678 
ets_insert_2_list_insert_db_term_list(DbTable * tb,void * list)1679 static void ets_insert_2_list_insert_db_term_list(DbTable* tb,
1680                                                   void* list)
1681 {
1682     void* lst = list;
1683     void* term = NULL;
1684     DbTableMethod* meth = tb->common.meth;
1685     do {
1686         LOCAL_VARIABLE(SWord, consumed_reds);
1687         consumed_reds = 1;
1688         term = meth->db_dbterm_list_remove_first(&lst);
1689         meth->db_put_dbterm(tb, term, 0, &consumed_reds);
1690         YCF_CONSUME_REDS(consumed_reds);
1691     } while (lst != NULL);
1692     return;
1693 }
1694 
ets_insert_2_list_lock_tbl(Eterm table_id,Process * p,Uint bif_ix,ets_insert_2_list_status on_success_status)1695 static void ets_insert_2_list_lock_tbl(Eterm table_id,
1696                                        Process* p,
1697                                        Uint bif_ix,
1698                                        ets_insert_2_list_status on_success_status)
1699 {
1700     BIF_RETTYPE fail_ret;
1701     DbTable* tb;
1702     do {
1703         fail_ret = db_get_table_or_fail_return(&tb,
1704                                                table_id,
1705                                                DB_WRITE,
1706                                                LCK_WRITE,
1707                                                bif_ix,
1708                                                p);
1709         if (tb == NULL) {
1710             ets_insert_2_list_info *ctx = YCF_GET_EXTRA_CONTEXT();
1711             if (p->freason == TRAP) {
1712                 ctx->status = ETS_INSERT_2_LIST_FAILED_TO_GET_LOCK;
1713             } else {
1714                 ctx->status = ETS_INSERT_2_LIST_FAILED_TO_GET_LOCK_DESTROY;
1715                 ctx->destroy_return_value = fail_ret;
1716             }
1717 #ifdef DEBUG
1718             /*
1719              *  Setting ctx to NULL to avoid that YCF crashes with a
1720              *  pointer to stack error when running a debug
1721              *  build. YCF_GET_EXTRA_CONTEXT() may change between
1722              *  yields as we use stack allocated data for the context
1723              *  before the first yield so it is important that the
1724              *  context is obtained again with YCF_GET_EXTRA_CONTEXT()
1725              *  if a yield might have happened.
1726              */
1727             ctx = NULL;
1728 #endif
1729             YCF_YIELD();
1730         } else {
1731             ets_insert_2_list_info *ctx = YCF_GET_EXTRA_CONTEXT();
1732             ctx->status = on_success_status;
1733             ASSERT(DB_LOCK_FREE(tb) || erts_lc_rwmtx_is_rwlocked(&tb->common.rwlock));
1734             ASSERT(!(tb->common.status & DB_DELETE));
1735         }
1736     } while (tb == NULL);
1737 }
1738 #endif /* YCF_FUNCTIONS */
1739 
can_insert_without_yield(Uint32 tb_type,long list_len,long reds_left)1740 static ERTS_INLINE int can_insert_without_yield(Uint32 tb_type,
1741                                                 long list_len,
1742                                                 long reds_left)
1743 {
1744     if (tb_type & DB_BAG) {
1745         /* Bag inserts can be really bad and we don't know how much searching
1746          * for duplicates we will do */
1747         return 0;
1748     }
1749     else {
1750         return list_len <= reds_left;
1751     }
1752 }
1753 
1754 #ifdef YCF_FUNCTIONS
ets_insert_2_list(Process * p,Eterm table_id,DbTable * tb,Eterm list,int is_insert_new)1755 static BIF_RETTYPE ets_insert_2_list(Process* p,
1756                                      Eterm table_id,
1757                                      DbTable *tb,
1758                                      Eterm list,
1759                                      int is_insert_new)
1760 {
1761     int cret = DB_ERROR_NONE;
1762     void* db_term_list = NULL; /* OBS: memory managements depends on that
1763                                   db_term_list is initialized to NULL */
1764     DbTableMethod* meth = tb->common.meth;
1765     int compressed = tb->common.compress;
1766     int keypos = tb->common.keypos;
1767     Uint32 tb_type = tb->common.type;
1768     Uint bif_ix = (is_insert_new ? BIF_ets_insert_new_2 : BIF_ets_insert_2);
1769     long list_len;
1770     /* tb should not be accessed after this point unless the table
1771        lock is held as the table can get deleted while the function is
1772        yielding */
1773     list_len = ets_insert_2_list_check(keypos, list);
1774     if (list_len < 0) {
1775         Eterm ret;
1776         /*
1777          * Check whether we have sufficient access rights for the
1778          * table. This is necessary to ensure that the correct reason
1779          * for the failure will be available in stack backtrace.
1780          */
1781         ets_insert_2_list_lock_tbl(table_id, p, bif_ix, ETS_INSERT_2_LIST_PROCESS_LOCAL);
1782         db_unlock(tb, LCK_WRITE);
1783         ERTS_BIF_PREP_ERROR_TRAPPED2(ret, p, BADARG, BIF_TRAP_EXPORT(bif_ix), table_id, list);
1784         return ret;
1785     }
1786     if (can_insert_without_yield(tb_type, list_len, YCF_NR_OF_REDS_LEFT())) {
1787         long reds_boost;
1788         /* There is enough reductions left to do the inserts directly
1789            from the heap without yielding */
1790         ets_insert_2_list_lock_tbl(table_id, p, bif_ix, ETS_INSERT_2_LIST_PROCESS_LOCAL);
1791         /* Ensure that we will not yield while inserting from heap */
1792         reds_boost = YCF_MAX_NR_OF_REDS - YCF_NR_OF_REDS_LEFT();
1793         YCF_SET_NR_OF_REDS_LEFT(YCF_MAX_NR_OF_REDS);
1794         if (is_insert_new) {
1795             if (ets_insert_new_2_list_has_member(tb, list)) {
1796                 cret = DB_ERROR_NONE_FALSE;
1797             } else {
1798                 cret = ets_insert_2_list_from_p_heap(tb, list);
1799             }
1800         } else {
1801             cret = ets_insert_2_list_from_p_heap(tb, list);
1802         }
1803         db_unlock(tb, LCK_WRITE);
1804         YCF_SET_NR_OF_REDS_LEFT(YCF_NR_OF_REDS_LEFT() - reds_boost);
1805         return ets_cret_to_return_value(p, cret);
1806     }
1807     /* Copy term list from heap so that other processes can help */
1808     db_term_list =
1809         ets_insert_2_list_copy_term_list(meth, compressed, keypos, list);
1810     /* Lock table */
1811     ets_insert_2_list_lock_tbl(table_id, p, bif_ix, ETS_INSERT_2_LIST_GLOBAL);
1812     /* The operation must complete after this point */
1813     if (is_insert_new) {
1814         if (ets_insert_new_2_dbterm_list_has_member(tb, db_term_list)) {
1815             ets_insert_2_list_destroy_copied_dbterms(meth,
1816                                                      compressed,
1817                                                      db_term_list);
1818             cret = DB_ERROR_NONE_FALSE;
1819         } else {
1820             ets_insert_2_list_insert_db_term_list(tb, db_term_list);
1821         }
1822     } else {
1823         ets_insert_2_list_insert_db_term_list(tb, db_term_list);
1824     }
1825     if (tb->common.continuation != NULL) {
1826         /* Uninstall the continuation from the table struct */
1827         tb->common.continuation = NULL;
1828         if (is_insert_new) {
1829             int* result_ptr =
1830                 ERTS_MAGIC_BIN_DATA(tb->common.continuation_res_bin);
1831             *result_ptr = cret;
1832             erts_bin_release(tb->common.continuation_res_bin);
1833         }
1834         tb->common.status |= tb->common.type & (DB_PRIVATE|DB_PROTECTED|DB_PUBLIC);
1835         tb->common.status &= ~DB_BUSY;
1836         erts_atomic_set_relb(&tb->common.continuation_state, (Sint)NULL);
1837     }
1838 
1839     return ets_cret_to_return_value(NULL, cret);
1840 
1841     /* The following code will be executed if the initiating process
1842        is killed before an ets_insert_2_list_lock_tbl call has
1843        succeeded */
1844     YCF_SPECIAL_CODE_START(ON_DESTROY_STATE); {
1845         ets_insert_2_list_destroy_copied_dbterms(meth,
1846                                                  compressed,
1847                                                  db_term_list);
1848     } YCF_SPECIAL_CODE_END();
1849 }
1850 #endif /* YCF_FUNCTIONS */
1851 
1852 /*
1853  * < < < < < < < < < < < < < < < < < < < < < < < < < < < < < <
1854  * < < < < < < < < < < < < < < < < < < < < < < < < < < < < < <
1855  *
1856  * End of code section that Yielding C Fun (YCF) transforms
1857  *
1858  * < < < < < < < < < < < < < < < < < < < < < < < < < < < < < <
1859  * < < < < < < < < < < < < < < < < < < < < < < < < < < < < < <
1860  */
1861 #if defined(DEBUG) && defined(ARCH_64)
1862 #include "erl_db_insert_list.debug.ycf.h"
1863 #else
1864 #include "erl_db_insert_list.ycf.h"
1865 #endif
1866 
ets_insert_2_yield_alloc(size_t size,void * ctx)1867 static void* ets_insert_2_yield_alloc(size_t size, void* ctx)
1868 {
1869     (void)ctx;
1870     return erts_alloc(ERTS_ALC_T_ETS_I_LST_TRAP, size);
1871 }
1872 
ets_insert_2_yield_free(void * data,void * ctx)1873 static void ets_insert_2_yield_free(void* data, void* ctx)
1874 {
1875     (void)ctx;
1876     erts_free(ERTS_ALC_T_ETS_I_LST_TRAP, data);
1877 }
1878 
ets_insert_2_list_yield_dtor(Binary * bin)1879 static int ets_insert_2_list_yield_dtor(Binary* bin)
1880 {
1881     ets_insert_2_list_info* ctx = ERTS_MAGIC_BIN_DATA(bin);
1882     if (ctx->status != ETS_INSERT_2_LIST_GLOBAL &&
1883         ctx->continuation_state != NULL) {
1884         /* The operation has not been committed to the table and has
1885            not completed*/
1886         ets_insert_2_list_ycf_gen_destroy(ctx->continuation_state);
1887     }
1888     return 1;
1889 }
1890 
ets_insert_2_list_continuation(long * reds_ptr,void ** state,void * extra_context)1891 static void ets_insert_2_list_continuation(long *reds_ptr,
1892                                            void** state,
1893                                            void* extra_context)
1894 {
1895 #if defined(DEBUG) && defined(ARCH_64)
1896     ycf_debug_set_stack_start(reds_ptr);
1897 #endif
1898     ets_insert_2_list_ycf_gen_continue(reds_ptr, state, extra_context);
1899 #if defined(DEBUG) && defined(ARCH_64)
1900     ycf_debug_reset_stack_start();
1901 #endif
1902 }
1903 
db_insert_new_2_res_bin_dtor(Binary * context_bin)1904 static int db_insert_new_2_res_bin_dtor(Binary *context_bin)
1905 {
1906     (void)context_bin;
1907     return 1;
1908 }
1909 
1910 #define ITERATIONS_PER_RED 8
1911 
ets_insert_2_list_driver(Process * p,Eterm tid,Eterm list,int is_insert_new)1912 static BIF_RETTYPE ets_insert_2_list_driver(Process* p,
1913                                             Eterm tid,
1914                                             Eterm list,
1915                                             int is_insert_new) {
1916     const long reds = ITERATIONS_PER_RED * ERTS_BIF_REDS_LEFT(p);
1917     long nr_of_reductions = DBG_RANDOM_REDS(reds, (Uint)&p);
1918     const long init_reds = nr_of_reductions;
1919     ets_insert_2_list_info* ctx = NULL;
1920     ets_insert_2_list_info ictx;
1921     BIF_RETTYPE ret = THE_NON_VALUE;
1922     Eterm state_mref = list;
1923     Uint bix = (is_insert_new ? BIF_ets_insert_new_2 : BIF_ets_insert_2);
1924     if (is_internal_magic_ref(state_mref)) {
1925         Binary* state_bin = erts_magic_ref2bin(state_mref);
1926         if (ERTS_MAGIC_BIN_DESTRUCTOR(state_bin) != ets_insert_2_list_yield_dtor) {
1927             BIF_ERROR(p, BADARG);
1928         }
1929         /* Continue a trapped call */
1930         erts_set_gc_state(p, 1);
1931         ctx = ERTS_MAGIC_BIN_DATA(state_bin);
1932         if (ctx->status == ETS_INSERT_2_LIST_GLOBAL) {
1933             /* An operation that can be helped by other operations is
1934                handled here */
1935             Uint freason;
1936             int cret = DB_ERROR_NONE;
1937             DbTable* tb;
1938             /* First check if another process has completed the
1939                operation without acquiring the lock */
1940             tb = db_get_table(p, tid, DB_READ_TBL_STRUCT, NOLCK_ACCESS, &freason);
1941             ASSERT(tb || freason != TRAP);
1942             if (tb != NULL &&
1943                 (void*)erts_atomic_read_acqb(&tb->common.continuation_state) ==
1944                 ctx->continuation_state) {
1945                 /* The lock has to be taken to complete the operation */
1946                 if (NULL == (tb = db_get_table(p, tid, DB_WRITE, LCK_WRITE, &freason))) {
1947                     if (freason == TRAP){
1948                         erts_set_gc_state(p, 0);
1949                         return db_bif_fail(p, freason, bix, NULL);
1950                     }
1951                 }
1952                 /* Must be done since the db_get_table call did not trap */
1953                 if (tb != NULL) {
1954                     db_unlock(tb, LCK_WRITE);
1955                 }
1956             }
1957             if (is_insert_new) {
1958                 int* res = ERTS_MAGIC_BIN_DATA(ctx->continuation_res_bin);
1959                 cret = *res;
1960             }
1961             return ets_cret_to_return_value(NULL, cret);
1962         } else {
1963 #if defined(DEBUG) && defined(ARCH_64)
1964             ycf_debug_set_stack_start(&nr_of_reductions);
1965 #endif
1966             ret = ets_insert_2_list_ycf_gen_continue(&nr_of_reductions,
1967                                                      &ctx->continuation_state,
1968                                                      ctx);
1969 #if defined(DEBUG) && defined(ARCH_64)
1970             ycf_debug_reset_stack_start();
1971 #endif
1972         }
1973     } else {
1974         /* Start call */
1975         ictx.continuation_state = NULL;
1976         ictx.status = ETS_INSERT_2_LIST_PROCESS_LOCAL;
1977         ictx.tb = NULL;
1978         ctx = &ictx;
1979         DB_GET_TABLE(ctx->tb, tid, DB_READ_TBL_STRUCT, NOLCK_ACCESS, bix, NULL, p);
1980 #if defined(DEBUG) && defined(ARCH_64)
1981         ycf_debug_set_stack_start(&nr_of_reductions);
1982 #endif
1983         ret = ets_insert_2_list_ycf_gen_yielding(&nr_of_reductions,
1984                                                  &ctx->continuation_state,
1985                                                  ctx,
1986                                                  ets_insert_2_yield_alloc,
1987                                                  ets_insert_2_yield_free,
1988                                                  NULL,
1989                                                  0,
1990                                                  NULL,
1991                                                  p,
1992                                                  tid,
1993                                                  ctx->tb,
1994                                                  list,
1995                                                  is_insert_new);
1996 #if defined(DEBUG) && defined(ARCH_64)
1997         ycf_debug_reset_stack_start();
1998 #endif
1999         if (ctx->continuation_state != NULL) {
2000             Binary* state_bin = erts_create_magic_binary(sizeof(ets_insert_2_list_info),
2001                                                          ets_insert_2_list_yield_dtor);
2002             Eterm* hp = HAlloc(p, ERTS_MAGIC_REF_THING_SIZE);
2003             state_mref = erts_mk_magic_ref(&hp, &MSO(p), state_bin);
2004             ctx = ERTS_MAGIC_BIN_DATA(state_bin);
2005             *ctx = ictx;
2006         }
2007     }
2008     BUMP_REDS(p, (init_reds - nr_of_reductions) / ITERATIONS_PER_RED);
2009     if (ctx->status == ETS_INSERT_2_LIST_GLOBAL &&
2010         ctx->continuation_state != NULL &&
2011         ctx->tb->common.continuation == NULL) {
2012         /* Install the continuation in the table structure so other
2013            threads can help */
2014         if (is_insert_new) {
2015             Binary* bin =
2016                 erts_create_magic_binary(sizeof(int),
2017                                          db_insert_new_2_res_bin_dtor);
2018             Eterm* hp = HAlloc(p, ERTS_MAGIC_REF_THING_SIZE);
2019             erts_mk_magic_ref(&hp, &MSO(p), bin);
2020             erts_refc_inctest(&bin->intern.refc, 2);
2021             ctx->tb->common.continuation_res_bin = bin;
2022             ctx->continuation_res_bin = bin;
2023         }
2024         ctx->tb->common.continuation = ets_insert_2_list_continuation;
2025         ctx->tb->common.status &= ~(DB_PRIVATE|DB_PROTECTED|DB_PUBLIC);
2026         ctx->tb->common.status |= DB_BUSY;
2027         erts_atomic_set_relb(&ctx->tb->common.continuation_state,
2028                              (Sint)ctx->continuation_state);
2029     }
2030     if (ctx->status == ETS_INSERT_2_LIST_FAILED_TO_GET_LOCK_DESTROY) {
2031         return ctx->destroy_return_value;
2032     }
2033     if (ctx->status == ETS_INSERT_2_LIST_GLOBAL) {
2034         db_unlock(ctx->tb, LCK_WRITE);
2035     }
2036     if (ctx->continuation_state != NULL) {
2037         erts_set_gc_state(p, 0);
2038         BIF_TRAP2(BIF_TRAP_EXPORT(bix), p, tid, state_mref);
2039     }
2040     return ret;
2041 }
2042 
2043 /*
2044 ** The put BIF
2045 */
ets_insert_2(BIF_ALIST_2)2046 BIF_RETTYPE ets_insert_2(BIF_ALIST_2)
2047 {
2048     DbTable* tb;
2049     int cret = DB_ERROR_NONE;
2050     Eterm insert_term;
2051     DbTableMethod* meth;
2052     SWord consumed_reds = 0;
2053     CHECK_TABLES();
2054     if (BIF_ARG_2 == NIL) {
2055         /* Check that the table exists */
2056         DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE_REC, BIF_ets_insert_2);
2057         db_unlock(tb, LCK_WRITE_REC);
2058 	BIF_RET(am_true);
2059     } if ((is_list(BIF_ARG_2) && CDR(list_val(BIF_ARG_2)) != NIL) ||
2060           is_internal_magic_ref(BIF_ARG_2)) {
2061         /* Handle list case */
2062        return ets_insert_2_list_driver(BIF_P,
2063                                        BIF_ARG_1,
2064                                        BIF_ARG_2,
2065                                        0);
2066     } else if (is_list(BIF_ARG_2)) {
2067         insert_term = CAR(list_val(BIF_ARG_2));
2068     } else {
2069         insert_term = BIF_ARG_2;
2070     }
2071 
2072     DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE_REC, BIF_ets_insert_2);
2073 
2074     meth = tb->common.meth;
2075     if (is_not_tuple(insert_term) ||
2076         (arityval(*tuple_val(insert_term)) < tb->common.keypos)) {
2077         db_unlock(tb, LCK_WRITE_REC);
2078         BIF_ERROR(BIF_P, BADARG);
2079     }
2080     cret = meth->db_put(tb, insert_term, 0, &consumed_reds);
2081 
2082     db_unlock(tb, LCK_WRITE_REC);
2083 
2084     BUMP_REDS(BIF_P, consumed_reds / ITERATIONS_PER_RED);
2085     return ets_cret_to_return_value(BIF_P, cret);
2086 }
2087 
2088 
2089 /*
2090 ** The put-if-not-already-there BIF...
2091 */
ets_insert_new_2(BIF_ALIST_2)2092 BIF_RETTYPE ets_insert_new_2(BIF_ALIST_2)
2093 {
2094     DbTable* tb;
2095     int cret = DB_ERROR_NONE;
2096     Eterm ret = am_true;
2097     Eterm obj;
2098     db_lock_kind_t kind;
2099     SWord consumed_reds = 0;
2100     CHECK_TABLES();
2101 
2102     if (BIF_ARG_2 == NIL) {
2103         /* Check that the table exists */
2104         DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE_REC, BIF_ets_insert_2);
2105         db_unlock(tb, LCK_WRITE_REC);
2106 	BIF_RET(am_true);
2107     } if ((is_list(BIF_ARG_2) && CDR(list_val(BIF_ARG_2)) != NIL) ||
2108           is_internal_magic_ref(BIF_ARG_2)) {
2109         /* Handle list case */
2110         return ets_insert_2_list_driver(BIF_P, BIF_ARG_1, BIF_ARG_2, 1);
2111     } else if (is_list(BIF_ARG_2)) {
2112         obj = CAR(list_val(BIF_ARG_2));
2113     } else {
2114         obj = BIF_ARG_2;
2115     }
2116 
2117     /* Only one object */
2118     kind = LCK_WRITE_REC;
2119     DB_BIF_GET_TABLE(tb, DB_WRITE, kind, BIF_ets_insert_new_2);
2120 
2121     if (is_not_tuple(obj)
2122 	|| (arityval(*tuple_val(obj)) < tb->common.keypos)) {
2123         db_unlock(tb, kind);
2124         BIF_ERROR(BIF_P, BADARG);
2125     }
2126     cret = tb->common.meth->db_put(tb, obj,
2127 				   1,  /* key_clash_fail */
2128                                    &consumed_reds);
2129 
2130     db_unlock(tb, kind);
2131 
2132     BUMP_REDS(BIF_P, consumed_reds / ITERATIONS_PER_RED);
2133     switch (cret) {
2134     case DB_ERROR_NONE:
2135 	BIF_RET(ret);
2136     case DB_ERROR_BADKEY:
2137 	BIF_RET(am_false);
2138     case DB_ERROR_SYSRES:
2139 	BIF_ERROR(BIF_P, SYSTEM_LIMIT);
2140     default:
2141 	BIF_ERROR(BIF_P, BADARG);
2142     }
2143 }
2144 
2145 /*
2146 ** Rename a (possibly) named table
2147 */
2148 
ets_rename_2(BIF_ALIST_2)2149 BIF_RETTYPE ets_rename_2(BIF_ALIST_2)
2150 {
2151     DbTable* tb;
2152     Eterm ret;
2153     Eterm old_name;
2154     erts_rwmtx_t *lck1, *lck2;
2155     Uint freason;
2156 
2157 #ifdef HARDDEBUG
2158     erts_fprintf(stderr,
2159 		"ets:rename(%T,%T); Process: %T, initial: %T:%T/%bpu\n",
2160 		BIF_ARG_1, BIF_ARG_2, BIF_P->common.id,
2161 		BIF_P->u.initial[0], BIF_P->u.initial[1], BIF_P->u.initial[2]);
2162 #endif
2163 
2164 
2165     if (is_not_atom(BIF_ARG_2)) {
2166         /* Do lookup to report bad table identifier or table name. */
2167         DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_READ, BIF_ets_rename_2);
2168         db_unlock(tb, LCK_READ);
2169         BIF_ERROR(BIF_P, BADARG);
2170     }
2171 
2172     (void) meta_name_tab_bucket(BIF_ARG_2, &lck1);
2173 
2174     if (is_atom(BIF_ARG_1)) {
2175         old_name = BIF_ARG_1;
2176     named_tab:
2177 	(void) meta_name_tab_bucket(old_name, &lck2);
2178 	if (lck1 == lck2)
2179 	    lck2 = NULL;
2180 	else if (lck1 > lck2) {
2181 	    erts_rwmtx_t *tmp = lck1;
2182 	    lck1 = lck2;
2183 	    lck2 = tmp;
2184 	}
2185     }
2186     else {
2187         tb = tid2tab(BIF_ARG_1, &BIF_P->fvalue);
2188         if (!tb)
2189             BIF_ERROR(BIF_P, BADARG | EXF_HAS_EXT_INFO);
2190         else {
2191             if (is_table_named(tb)) {
2192                 old_name = tb->common.the_name;
2193                 goto named_tab;
2194             }
2195             lck2 = NULL;
2196         }
2197     }
2198 
2199     erts_rwmtx_rwlock(lck1);
2200     if (lck2)
2201 	erts_rwmtx_rwlock(lck2);
2202 
2203     tb = db_get_table_aux(BIF_P, BIF_ARG_1, DB_WRITE, LCK_WRITE, 1, &freason);
2204     if (!tb)
2205 	goto fail;
2206 
2207     if (is_table_named(tb)) {
2208         if (!insert_named_tab(BIF_ARG_2, tb, 1))
2209             goto badarg;
2210 
2211         if (!remove_named_tab(tb, 1))
2212             erts_exit(ERTS_ERROR_EXIT,"Could not find named tab %s", tb->common.the_name);
2213         ret = BIF_ARG_2;
2214     }
2215     else { /* Not a named table */
2216         ret = BIF_ARG_1;
2217     }
2218     tb->common.the_name = BIF_ARG_2;
2219 
2220     db_unlock(tb, LCK_WRITE);
2221     erts_rwmtx_rwunlock(lck1);
2222     if (lck2)
2223 	erts_rwmtx_rwunlock(lck2);
2224     BIF_RET(ret);
2225 
2226 badarg:
2227     freason = BADARG;
2228 
2229 fail:
2230     if (tb)
2231 	db_unlock(tb, LCK_WRITE);
2232     erts_rwmtx_rwunlock(lck1);
2233     if (lck2)
2234 	erts_rwmtx_rwunlock(lck2);
2235 
2236     return db_bif_fail(BIF_P, freason, BIF_ets_rename_2, NULL);
2237 }
2238 
2239 
2240 /*
2241 ** The create table BIF
2242 ** Args: (Name, Properties)
2243 */
2244 
ets_new_2(BIF_ALIST_2)2245 BIF_RETTYPE ets_new_2(BIF_ALIST_2)
2246 {
2247     DbTable* tb = NULL;
2248     Eterm list;
2249     Eterm val;
2250     Eterm ret;
2251     Eterm heir;
2252     UWord heir_data;
2253     Uint32 status;
2254     Sint keypos;
2255     int is_named, is_compressed;
2256     int is_fine_locked, frequent_read;
2257     int is_decentralized_counters;
2258     int is_decentralized_counters_option;
2259     int cret;
2260     DbTableMethod* meth;
2261 
2262     if (is_not_atom(BIF_ARG_1)) {
2263 	BIF_ERROR(BIF_P, BADARG);
2264     }
2265     if (is_not_nil(BIF_ARG_2) && is_not_list(BIF_ARG_2)) {
2266 	BIF_ERROR(BIF_P, BADARG);
2267     }
2268 
2269     status = DB_SET | DB_PROTECTED;
2270     keypos = 1;
2271     is_named = 0;
2272     is_fine_locked = 0;
2273     frequent_read = 0;
2274     is_decentralized_counters = 0;
2275     is_decentralized_counters_option = -1;
2276     heir = am_none;
2277     heir_data = (UWord) am_undefined;
2278     is_compressed = erts_ets_always_compress;
2279 
2280     list = BIF_ARG_2;
2281     while(is_list(list)) {
2282 	val = CAR(list_val(list));
2283 	if (val == am_bag) {
2284 	    status |= DB_BAG;
2285 	    status &= ~(DB_SET | DB_DUPLICATE_BAG | DB_ORDERED_SET | DB_CA_ORDERED_SET);
2286 	}
2287 	else if (val == am_duplicate_bag) {
2288 	    status |= DB_DUPLICATE_BAG;
2289 	    status &= ~(DB_SET | DB_BAG | DB_ORDERED_SET | DB_CA_ORDERED_SET);
2290 	}
2291 	else if (val == am_ordered_set) {
2292             is_decentralized_counters = 1;
2293 	    status |= DB_ORDERED_SET;
2294 	    status &= ~(DB_SET | DB_BAG | DB_DUPLICATE_BAG | DB_CA_ORDERED_SET);
2295 	}
2296 	else if (is_tuple(val)) {
2297 	    Eterm *tp = tuple_val(val);
2298 	    if (arityval(tp[0]) == 2) {
2299 		if (tp[1] == am_keypos
2300 		    && is_small(tp[2]) && (signed_val(tp[2]) > 0)) {
2301 		    keypos = signed_val(tp[2]);
2302 		}
2303 		else if (tp[1] == am_write_concurrency) {
2304                     if (tp[2] == am_true) {
2305                         is_fine_locked = 1;
2306                     } else if (tp[2] == am_false) {
2307                         is_fine_locked = 0;
2308                     } else break;
2309                     if (DB_LOCK_FREE(NULL))
2310 			is_fine_locked = 0;
2311 		}
2312 		else if (tp[1] == am_read_concurrency) {
2313 		    if (tp[2] == am_true) {
2314 			frequent_read = 1;
2315 		    } else if (tp[2] == am_false) {
2316 			frequent_read = 0;
2317 		    } else break;
2318 		}
2319 		else if (tp[1] == am_heir && tp[2] == am_none) {
2320 		    heir = am_none;
2321 		    heir_data = am_undefined;
2322 		}
2323                 else if (tp[1] == am_decentralized_counters) {
2324 		    if (tp[2] == am_true) {
2325 			is_decentralized_counters_option = 1;
2326 		    } else if (tp[2] == am_false) {
2327 			is_decentralized_counters_option = 0;
2328 		    } else break;
2329                 }
2330 		else break;
2331 	    }
2332 	    else if (arityval(tp[0]) == 3 && tp[1] == am_heir
2333 		     && is_internal_pid(tp[2])) {
2334 		heir = tp[2];
2335 		heir_data = tp[3];
2336 	    }
2337 	    else break;
2338 	}
2339 	else if (val == am_public) {
2340 	    status |= DB_PUBLIC;
2341 	    status &= ~(DB_PROTECTED|DB_PRIVATE);
2342 	}
2343 	else if (val == am_private) {
2344 	    status |= DB_PRIVATE;
2345 	    status &= ~(DB_PROTECTED|DB_PUBLIC);
2346 	}
2347 	else if (val == am_named_table) {
2348 	    is_named = 1;
2349             status |= DB_NAMED_TABLE;
2350 	}
2351 	else if (val == am_compressed) {
2352 	    is_compressed = 1;
2353 	}
2354 	else if (val == am_set || val == am_protected)
2355 	    ;
2356 	else break;
2357 
2358 	list = CDR(list_val(list));
2359     }
2360     if (is_not_nil(list)) { /* bad opt or not a well formed list */
2361 	BIF_ERROR(BIF_P, BADARG);
2362     }
2363     if (-1 != is_decentralized_counters_option) {
2364         is_decentralized_counters = is_decentralized_counters_option;
2365     }
2366     if (IS_TREE_TABLE(status) && is_fine_locked && !(status & DB_PRIVATE)) {
2367         meth = &db_catree;
2368         status |= DB_CA_ORDERED_SET;
2369         status &= ~(DB_SET | DB_BAG | DB_DUPLICATE_BAG | DB_ORDERED_SET);
2370         status |= DB_FINE_LOCKED;
2371     }
2372     else if (IS_HASH_TABLE(status)) {
2373 	meth = &db_hash;
2374 	if (is_fine_locked && !(status & DB_PRIVATE)) {
2375 	    status |= DB_FINE_LOCKED;
2376 	}
2377     }
2378     else if (IS_TREE_TABLE(status)) {
2379 	meth = &db_tree;
2380     }
2381     else {
2382 	BIF_ERROR(BIF_P, BADARG);
2383     }
2384 
2385     if (frequent_read && !(status & DB_PRIVATE))
2386 	status |= DB_FREQ_READ;
2387 
2388     /* we create table outside any table lock
2389      * and take the unusal cost of destroy table if it
2390      * fails to find a slot
2391      */
2392     {
2393         DbTable init_tb;
2394         erts_flxctr_init(&init_tb.common.counters, 0, 2, ERTS_ALC_T_ETS_CTRS);
2395 	tb = (DbTable*) erts_db_alloc(ERTS_ALC_T_DB_TABLE,
2396 				      &init_tb, sizeof(DbTable));
2397         erts_flxctr_init(&tb->common.counters,
2398                          (status & DB_FINE_LOCKED) && is_decentralized_counters,
2399                          2,
2400                          ERTS_ALC_T_ETS_CTRS);
2401         erts_flxctr_add(&tb->common.counters,
2402                         ERTS_DB_TABLE_MEM_COUNTER_ID,
2403                         DB_GET_APPROX_MEM_CONSUMED(&init_tb) +
2404                         erts_flxctr_nr_of_allocated_bytes(&tb->common.counters));
2405     }
2406 
2407     tb->common.meth = meth;
2408     tb->common.the_name = BIF_ARG_1;
2409     tb->common.status = status;
2410     tb->common.type = status;
2411     /* Note, 'type' is *read only* from now on... */
2412     tb->common.continuation = NULL;
2413     erts_atomic_set_nob(&tb->common.continuation_state, (Sint)NULL);
2414     erts_refc_init(&tb->common.fix_count, 0);
2415     db_init_lock(tb, status & (DB_FINE_LOCKED|DB_FREQ_READ));
2416     tb->common.keypos = keypos;
2417     tb->common.owner = BIF_P->common.id;
2418     set_heir(BIF_P, tb, heir, heir_data);
2419 
2420     tb->common.fixing_procs = NULL;
2421     tb->common.compress = is_compressed;
2422 #ifdef ETS_DBG_FORCE_TRAP
2423     tb->common.dbg_force_trap = erts_ets_dbg_force_trap;
2424 #endif
2425 
2426     cret = meth->db_create(BIF_P, tb);
2427     ASSERT(cret == DB_ERROR_NONE); (void)cret;
2428 
2429     make_btid(tb);
2430 
2431     if (is_named)
2432         ret = BIF_ARG_1;
2433     else
2434         ret = make_tid(BIF_P, tb);
2435 
2436     save_sched_table(BIF_P, tb);
2437     save_owned_table(BIF_P, tb);
2438 
2439     if (is_named && !insert_named_tab(BIF_ARG_1, tb, 0)) {
2440         tid_clear(BIF_P, tb);
2441         delete_owned_table(BIF_P, tb);
2442 
2443 	db_lock(tb,LCK_WRITE);
2444 	free_heir_data(tb);
2445 	tb->common.meth->db_free_empty_table(tb);
2446 	db_unlock(tb,LCK_WRITE);
2447         table_dec_refc(tb, 0);
2448 	BIF_ERROR(BIF_P, BADARG);
2449     }
2450 
2451     BIF_P->flags |= F_USING_DB; /* So we can remove tb if p dies */
2452 
2453 #ifdef HARDDEBUG
2454     erts_fprintf(stderr,
2455 		"ets:new(%T,%T)=%T; Process: %T, initial: %T:%T/%bpu\n",
2456 		 BIF_ARG_1, BIF_ARG_2, ret, BIF_P->common.id,
2457 		 BIF_P->u.initial[0], BIF_P->u.initial[1], BIF_P->u.initial[2]);
2458 #endif
2459 
2460     BIF_RET(ret);
2461 }
2462 
2463 /*
2464 ** Retrieves the tid() of a named ets table.
2465 */
ets_whereis_1(BIF_ALIST_1)2466 BIF_RETTYPE ets_whereis_1(BIF_ALIST_1)
2467 {
2468     DbTable* tb;
2469     Eterm res;
2470     Uint freason;
2471 
2472     if (is_not_atom(BIF_ARG_1)) {
2473         BIF_ERROR(BIF_P, BADARG);
2474     }
2475 
2476     if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_INFO, LCK_READ, &freason)) == NULL) {
2477         if (BIF_P->fvalue == EXI_ID) {
2478             BIF_RET(am_undefined);
2479         } else {
2480             //ToDo: Could we avoid this
2481             return db_bif_fail(BIF_P, freason, BIF_ets_whereis_1, NULL);
2482         }
2483     }
2484 
2485     res = make_tid(BIF_P, tb);
2486     db_unlock(tb, LCK_READ);
2487 
2488     BIF_RET(res);
2489 }
2490 
2491 /*
2492 ** The lookup BIF
2493 */
ets_lookup_2(BIF_ALIST_2)2494 BIF_RETTYPE ets_lookup_2(BIF_ALIST_2)
2495 {
2496     DbTable* tb;
2497     int cret;
2498     Eterm ret;
2499 
2500     CHECK_TABLES();
2501 
2502     DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_lookup_2);
2503 
2504     cret = tb->common.meth->db_get(BIF_P, tb, BIF_ARG_2, &ret);
2505 
2506     db_unlock(tb, LCK_READ);
2507 
2508     switch (cret) {
2509     case DB_ERROR_NONE:
2510 	BIF_RET(ret);
2511     case DB_ERROR_SYSRES:
2512 	BIF_ERROR(BIF_P, SYSTEM_LIMIT);
2513     default:
2514 	BIF_ERROR(BIF_P, BADARG);
2515     }
2516 
2517 }
2518 
2519 /*
2520 ** The lookup BIF
2521 */
ets_member_2(BIF_ALIST_2)2522 BIF_RETTYPE ets_member_2(BIF_ALIST_2)
2523 {
2524     DbTable* tb;
2525     int cret;
2526     Eterm ret;
2527 
2528     CHECK_TABLES();
2529 
2530     DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_member_2);
2531 
2532     cret = tb->common.meth->db_member(tb, BIF_ARG_2, &ret);
2533 
2534     db_unlock(tb, LCK_READ);
2535 
2536     switch (cret) {
2537     case DB_ERROR_NONE:
2538 	BIF_RET(ret);
2539     case DB_ERROR_SYSRES:
2540 	BIF_ERROR(BIF_P, SYSTEM_LIMIT);
2541     default:
2542 	BIF_ERROR(BIF_P, BADARG);
2543     }
2544 
2545 }
2546 
2547 /*
2548 ** Get an element from a term
2549 ** get_element_3(Tab, Key, Index)
2550 ** return the element or a list of elements if bag
2551 */
ets_lookup_element_3(BIF_ALIST_3)2552 BIF_RETTYPE ets_lookup_element_3(BIF_ALIST_3)
2553 {
2554     DbTable* tb;
2555     Sint index;
2556     int cret;
2557     Eterm ret;
2558 
2559     CHECK_TABLES();
2560 
2561     DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_lookup_element_3);
2562 
2563     if (is_not_small(BIF_ARG_3) || ((index = signed_val(BIF_ARG_3)) < 1)) {
2564 	db_unlock(tb, LCK_READ);
2565 	BIF_ERROR(BIF_P, BADARG);
2566     }
2567 
2568     cret = tb->common.meth->db_get_element(BIF_P, tb,
2569 					   BIF_ARG_2, index, &ret);
2570     db_unlock(tb, LCK_READ);
2571     switch (cret) {
2572     case DB_ERROR_NONE:
2573 	BIF_RET(ret);
2574     case DB_ERROR_SYSRES:
2575 	BIF_ERROR(BIF_P, SYSTEM_LIMIT);
2576     case DB_ERROR_BADKEY:
2577         BIF_P->fvalue = EXI_BAD_KEY;
2578         BIF_ERROR(BIF_P, BADARG | EXF_HAS_EXT_INFO);
2579     default:
2580 	BIF_ERROR(BIF_P, BADARG);
2581     }
2582 }
2583 
2584 /*
2585  * BIF to erase a whole table and release all memory it holds
2586  */
ets_delete_1(BIF_ALIST_1)2587 BIF_RETTYPE ets_delete_1(BIF_ALIST_1)
2588 {
2589     SWord initial_reds = ERTS_BIF_REDS_LEFT(BIF_P);
2590     SWord reds = initial_reds;
2591     DbTable* tb;
2592 
2593 #ifdef HARDDEBUG
2594     erts_fprintf(stderr,
2595 		"ets:delete(%T); Process: %T, initial: %T:%T/%bpu\n",
2596 		BIF_ARG_1, BIF_P->common.id,
2597 		BIF_P->u.initial[0], BIF_P->u.initial[1], BIF_P->u.initial[2]);
2598 #endif
2599 
2600     CHECK_TABLES();
2601 
2602     DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE, BIF_ets_delete_1);
2603 
2604     /*
2605      * Clear all access bits to prevent any ets operation to access the
2606      * table while it is being deleted.
2607      */
2608     tb->common.status &= ~(DB_PROTECTED|DB_PUBLIC|DB_PRIVATE);
2609     tb->common.status |= DB_DELETE;
2610 
2611     if (tb->common.owner != BIF_P->common.id) {
2612 
2613 	/*
2614 	 * The table is being deleted by a process other than its owner.
2615 	 * To make sure that the table will be completely deleted if the
2616 	 * current process will be killed (e.g. by an EXIT signal), we will
2617 	 * now transfer the ownership to the current process.
2618 	 */
2619 
2620         Process *rp = erts_proc_lookup_raw(tb->common.owner);
2621         /*
2622          * Process 'rp' might be exiting, but our table lock prevents it
2623          * from terminating as it cannot complete erts_db_process_exiting().
2624          */
2625         ASSERT(!(ERTS_PSFLG_FREE & erts_atomic32_read_nob(&rp->state)));
2626 
2627         delete_owned_table(rp, tb);
2628         BIF_P->flags |= F_USING_DB;
2629         tb->common.owner = BIF_P->common.id;
2630         save_owned_table(BIF_P, tb);
2631     }
2632 
2633     if (is_table_named(tb))
2634 	remove_named_tab(tb, 0);
2635 
2636     /* disable inheritance */
2637     free_heir_data(tb);
2638     tb->common.heir = am_none;
2639 
2640     reds -= free_fixations_locked(BIF_P, tb);
2641     tid_clear(BIF_P, tb);
2642     db_unlock(tb, LCK_WRITE);
2643 
2644     reds = free_table_continue(BIF_P, tb, reds);
2645     if (reds < 0) {
2646 	/*
2647 	 * Package the DbTable* pointer into a bignum so that it can be safely
2648 	 * passed through a trap. We used to pass the DbTable* pointer directly
2649 	 * (it looks like an continuation pointer), but that is will crash the
2650 	 * emulator if this BIF is call traced.
2651 	 */
2652 	Eterm *hp = HAlloc(BIF_P, 2);
2653 	hp[0] = make_pos_bignum_header(1);
2654 	hp[1] = (Eterm) tb;
2655         BUMP_ALL_REDS(BIF_P);
2656 	BIF_TRAP1(&ets_delete_continue_exp, BIF_P, make_big(hp));
2657     }
2658     else {
2659         BUMP_REDS(BIF_P, (initial_reds - reds));
2660 	BIF_RET(am_true);
2661     }
2662 }
2663 
2664 /*
2665 ** BIF ets:give_away(Tab, Pid, GiftData)
2666 */
ets_give_away_3(BIF_ALIST_3)2667 BIF_RETTYPE ets_give_away_3(BIF_ALIST_3)
2668 {
2669     Process* to_proc = NULL;
2670     ErtsProcLocks to_locks = ERTS_PROC_LOCK_MAIN;
2671     Eterm to_pid = BIF_ARG_2;
2672     Eterm from_pid;
2673     DbTable* tb = NULL;
2674     Uint freason;
2675 
2676     /*
2677      * Note that lock of the process must be taken before the lock
2678      * of the table.
2679      */
2680     to_proc = erts_pid2proc(BIF_P, ERTS_PROC_LOCK_MAIN, to_pid, to_locks);
2681     /*
2682      * If the table identifier has a problem, we want to report that even if
2683      * the Pid is bad.
2684      */
2685     tb = db_get_table(BIF_P, BIF_ARG_1, DB_WRITE, LCK_WRITE, &freason);
2686     if (!tb)
2687         goto fail;
2688 
2689     if (!to_proc) {
2690         freason = BADARG;
2691         goto fail;
2692     }
2693 
2694     if (tb->common.owner != BIF_P->common.id) {
2695 	BIF_P->fvalue = EXI_NOT_OWNER;
2696 	freason = BADARG | EXF_HAS_EXT_INFO;
2697 	goto fail;
2698     }
2699 
2700     from_pid = tb->common.owner;
2701     if (to_pid == from_pid) {
2702 	BIF_P->fvalue = EXI_OWNER;
2703 	freason = BADARG | EXF_HAS_EXT_INFO;
2704 	goto fail;
2705     }
2706 
2707     delete_owned_table(BIF_P, tb);
2708     to_proc->flags |= F_USING_DB;
2709     tb->common.owner = to_pid;
2710     save_owned_table(to_proc, tb);
2711 
2712     db_unlock(tb,LCK_WRITE);
2713     send_ets_transfer_message(BIF_P, to_proc, &to_locks,
2714                               tb, BIF_ARG_3);
2715     erts_proc_unlock(to_proc, to_locks);
2716     UnUseTmpHeap(5,BIF_P);
2717     BIF_RET(am_true);
2718 
2719  fail:
2720     if (to_proc != NULL && to_proc != BIF_P) erts_proc_unlock(to_proc, to_locks);
2721     if (tb != NULL) db_unlock(tb, LCK_WRITE);
2722 
2723     return db_bif_fail(BIF_P, freason, BIF_ets_give_away_3, NULL);
2724 }
2725 
ets_setopts_2(BIF_ALIST_2)2726 BIF_RETTYPE ets_setopts_2(BIF_ALIST_2)
2727 {
2728     DbTable* tb = NULL;
2729     Eterm* tp;
2730     Eterm opt;
2731     Eterm heir = THE_NON_VALUE;
2732     UWord heir_data = (UWord) THE_NON_VALUE;
2733     Uint32 protection = 0;
2734     DeclareTmpHeap(fakelist,2,BIF_P);
2735     Eterm tail;
2736 
2737     DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE, BIF_ets_setopts_2);
2738     if (tb == NULL) {
2739         BIF_ERROR(BIF_P, BADARG | EXF_HAS_EXT_INFO);
2740     }
2741 
2742     UseTmpHeap(2,BIF_P);
2743     for (tail = is_tuple(BIF_ARG_2) ? CONS(fakelist, BIF_ARG_2, NIL) : BIF_ARG_2;
2744 	  is_list(tail);
2745 	  tail = CDR(list_val(tail))) {
2746 
2747 	opt = CAR(list_val(tail));
2748 	if (!is_tuple(opt) || (tp = tuple_val(opt), arityval(tp[0]) < 2)) {
2749 	    goto badarg;
2750 	}
2751 
2752 	switch (tp[1]) {
2753 	case am_heir:
2754 	    if (heir != THE_NON_VALUE) goto badarg;
2755 	    heir = tp[2];
2756 	    if (arityval(tp[0]) == 2 && heir == am_none) {
2757 		heir_data = am_undefined;
2758 	    }
2759 	    else if (arityval(tp[0]) == 3 && is_internal_pid(heir)) {
2760 		heir_data = tp[3];
2761 	    }
2762 	    else goto badarg;
2763 	    break;
2764 
2765 	case am_protection:
2766 	    if (arityval(tp[0]) != 2 || protection != 0) goto badarg;
2767 	    switch (tp[2]) {
2768 	    case am_private: protection = DB_PRIVATE; break;
2769 	    case am_protected: protection = DB_PROTECTED; break;
2770 	    case am_public: protection = DB_PUBLIC; break;
2771 	    default: goto badarg;
2772 	    }
2773 	    break;
2774 
2775 	default: goto badarg;
2776 	}
2777     }
2778 
2779     if (tail != NIL)
2780         goto badarg;
2781 
2782     if (tb->common.owner != BIF_P->common.id)
2783 	goto badarg;
2784 
2785     if (heir_data != THE_NON_VALUE) {
2786 	free_heir_data(tb);
2787 	set_heir(BIF_P, tb, heir, heir_data);
2788     }
2789     if (protection) {
2790 	tb->common.status &= ~(DB_PRIVATE|DB_PROTECTED|DB_PUBLIC);
2791 	tb->common.status |= protection;
2792     }
2793 
2794     db_unlock (tb,LCK_WRITE);
2795     UnUseTmpHeap(2,BIF_P);
2796     BIF_RET(am_true);
2797 
2798 badarg:
2799     UnUseTmpHeap(2,BIF_P);
2800     if (tb != NULL) {
2801 	db_unlock(tb,LCK_WRITE);
2802     }
2803     BIF_ERROR(BIF_P, BADARG);
2804 }
2805 
2806 /*
2807  * Common for delete_all_objects and select_delete(DeleteAll).
2808  */
ets_internal_delete_all_2(BIF_ALIST_2)2809 BIF_RETTYPE ets_internal_delete_all_2(BIF_ALIST_2)
2810 {
2811     SWord initial_reds = ERTS_BIF_REDS_LEFT(BIF_P);
2812     SWord reds = initial_reds;
2813     Eterm nitems_holder = THE_NON_VALUE;
2814     DbTable* tb;
2815     CHECK_TABLES();
2816 
2817     DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE, BIF_ets_internal_delete_all_2);
2818 
2819     if (BIF_ARG_2 == am_undefined) {
2820         reds = tb->common.meth->db_delete_all_objects(BIF_P,
2821                                                       tb,
2822                                                       reds,
2823                                                       &nitems_holder);
2824         ASSERT(nitems_holder != THE_NON_VALUE);
2825         ASSERT(!(tb->common.status & DB_BUSY));
2826 
2827         if (reds < 0) {
2828             /*
2829              * Oboy, need to trap AND need to be atomic.
2830              * Solved by cooperative trapping where every process trying to
2831              * access this table (including this process) will "fail" to lookup
2832              * the table and instead pitch in deleting objects
2833              * (in delete_all_objects_continue) and then trap to self.
2834              */
2835             ASSERT((tb->common.status & (DB_PRIVATE|DB_PROTECTED|DB_PUBLIC))
2836                    ==
2837                    (tb->common.type & (DB_PRIVATE|DB_PROTECTED|DB_PUBLIC)));
2838             tb->common.status &= ~(DB_PRIVATE|DB_PROTECTED|DB_PUBLIC);
2839             tb->common.status |= DB_BUSY;
2840             db_unlock(tb, LCK_WRITE);
2841             BUMP_ALL_REDS(BIF_P);
2842             BIF_TRAP2(BIF_TRAP_EXPORT(BIF_ets_internal_delete_all_2), BIF_P,
2843                       BIF_ARG_1, nitems_holder);
2844         }
2845         else {
2846             /* Done, no trapping needed */
2847             BUMP_REDS(BIF_P, (initial_reds - reds));
2848         }
2849 
2850     }
2851     else {
2852         /*
2853          * The table lookup succeeded and second argument is nitems_holder
2854          * and not 'undefined', which means we have trapped at least once
2855          * and are now done.
2856          */
2857         nitems_holder = BIF_ARG_2;
2858     }
2859     db_unlock(tb, LCK_WRITE);
2860     {
2861     Eterm nitems =
2862         tb->common.meth->db_delete_all_objects_get_nitems_from_holder(BIF_P,
2863                                                                       nitems_holder);
2864     BIF_RET(nitems);
2865     }
2866 }
2867 
delete_all_objects_continue(Process * p,DbTable * tb)2868 static void delete_all_objects_continue(Process* p, DbTable* tb)
2869 {
2870     SWord initial_reds = ERTS_BIF_REDS_LEFT(p);
2871     SWord reds = initial_reds;
2872 
2873     ERTS_LC_ASSERT(DB_LOCK_FREE(tb) || erts_lc_rwmtx_is_rwlocked(&tb->common.rwlock));
2874 
2875     if ((tb->common.status & (DB_DELETE|DB_BUSY)) != DB_BUSY)
2876         return;
2877 
2878     reds = tb->common.meth->db_delete_all_objects(p, tb, reds, NULL);
2879 
2880     if (reds < 0) {
2881         BUMP_ALL_REDS(p);
2882     }
2883     else {
2884         tb->common.status |= tb->common.type & (DB_PRIVATE|DB_PROTECTED|DB_PUBLIC);
2885         tb->common.status &= ~DB_BUSY;
2886         BUMP_REDS(p, (initial_reds - reds));
2887     }
2888 }
2889 
2890 /*
2891 ** Erase an object with given key, or maybe several objects if we have a bag
2892 ** Called as db_erase(Tab, Key), where Key is element 1 of the
2893 ** object(s) we want to erase
2894 */
ets_delete_2(BIF_ALIST_2)2895 BIF_RETTYPE ets_delete_2(BIF_ALIST_2)
2896 {
2897     DbTable* tb;
2898     int cret;
2899     Eterm ret;
2900 
2901     CHECK_TABLES();
2902 
2903     DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE_REC, BIF_ets_delete_2);
2904 
2905     cret = tb->common.meth->db_erase(tb,BIF_ARG_2,&ret);
2906 
2907     db_unlock(tb, LCK_WRITE_REC);
2908 
2909     switch (cret) {
2910     case DB_ERROR_NONE:
2911 	BIF_RET(ret);
2912     case DB_ERROR_SYSRES:
2913 	BIF_ERROR(BIF_P, SYSTEM_LIMIT);
2914     default:
2915 	BIF_ERROR(BIF_P, BADARG);
2916     }
2917 }
2918 
2919 /*
2920 ** Erase a specific object, or maybe several objects if we have a bag
2921 */
ets_delete_object_2(BIF_ALIST_2)2922 BIF_RETTYPE ets_delete_object_2(BIF_ALIST_2)
2923 {
2924     DbTable* tb;
2925     int cret;
2926     Eterm ret;
2927 
2928     CHECK_TABLES();
2929 
2930     DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE_REC, BIF_ets_delete_object_2);
2931 
2932     if (is_not_tuple(BIF_ARG_2) ||
2933 	(arityval(*tuple_val(BIF_ARG_2)) < tb->common.keypos)) {
2934 	db_unlock(tb, LCK_WRITE_REC);
2935 	BIF_ERROR(BIF_P, BADARG);
2936     }
2937 
2938     cret = tb->common.meth->db_erase_object(tb, BIF_ARG_2, &ret);
2939     db_unlock(tb, LCK_WRITE_REC);
2940 
2941     switch (cret) {
2942     case DB_ERROR_NONE:
2943 	BIF_RET(ret);
2944     case DB_ERROR_SYSRES:
2945 	BIF_ERROR(BIF_P, SYSTEM_LIMIT);
2946     default:
2947 	BIF_ERROR(BIF_P, BADARG);
2948     }
2949 }
2950 
2951 /*
2952 ** This is for trapping, cannot be called directly.
2953 */
ets_select_delete_trap_1(BIF_ALIST_1)2954 static BIF_RETTYPE ets_select_delete_trap_1(BIF_ALIST_1)
2955 {
2956     Process *p = BIF_P;
2957     Eterm a1 = BIF_ARG_1;
2958     BIF_RETTYPE result;
2959     DbTable* tb;
2960     int cret;
2961     Eterm ret;
2962     Eterm *tptr;
2963     db_lock_kind_t kind = LCK_WRITE_REC;
2964     enum DbIterSafety safety = ITER_SAFE;
2965 
2966     CHECK_TABLES();
2967     ASSERT(is_tuple(a1));
2968     tptr = tuple_val(a1);
2969     ASSERT(arityval(*tptr) >= 1);
2970 
2971     DB_TRAP_GET_TABLE(tb, tptr[1], DB_WRITE, kind,
2972                       &ets_select_delete_continue_exp);
2973 
2974     cret = tb->common.meth->db_select_delete_continue(p,tb,a1,&ret,&safety);
2975 
2976     if(!DID_TRAP(p,ret) && safety != ITER_SAFE) {
2977         ASSERT(erts_refc_read(&tb->common.fix_count,1));
2978         unfix_table_locked(p, tb, &kind);
2979     }
2980 
2981     db_unlock(tb, kind);
2982 
2983     switch (cret) {
2984     case DB_ERROR_NONE:
2985 	ERTS_BIF_PREP_RET(result, ret);
2986 	break;
2987     default:
2988 	ERTS_BIF_PREP_ERROR(result, p, BADARG);
2989 	break;
2990     }
2991     erts_match_set_release_result(p);
2992 
2993     return result;
2994 }
2995 
2996 
2997 /*
2998  * ets:select_delete/2 without special case for "delete-all".
2999  */
ets_internal_select_delete_2(BIF_ALIST_2)3000 BIF_RETTYPE ets_internal_select_delete_2(BIF_ALIST_2)
3001 {
3002     BIF_RETTYPE result;
3003     DbTable* tb;
3004     int cret;
3005     Eterm ret;
3006     enum DbIterSafety safety;
3007 
3008     CHECK_TABLES();
3009 
3010     DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE_REC, BIF_ets_internal_select_delete_2);
3011 
3012     safety = ITERATION_SAFETY(BIF_P,tb);
3013     if (safety == ITER_UNSAFE) {
3014 	local_fix_table(tb);
3015     }
3016     cret = tb->common.meth->db_select_delete(BIF_P, tb, BIF_ARG_1, BIF_ARG_2,
3017                                              &ret, safety);
3018 
3019     if (DID_TRAP(BIF_P,ret) && safety != ITER_SAFE) {
3020 	fix_table_locked(BIF_P,tb);
3021     }
3022     if (safety == ITER_UNSAFE) {
3023 	local_unfix_table(tb);
3024     }
3025     db_unlock(tb, LCK_WRITE_REC);
3026 
3027     switch (cret) {
3028     case DB_ERROR_NONE:
3029 	ERTS_BIF_PREP_RET(result, ret);
3030 	break;
3031     case DB_ERROR_SYSRES:
3032 	ERTS_BIF_PREP_ERROR(result, BIF_P, SYSTEM_LIMIT);
3033 	break;
3034     default:
3035 	ERTS_BIF_PREP_ERROR(result, BIF_P, BADARG);
3036 	break;
3037     }
3038 
3039     erts_match_set_release_result(BIF_P);
3040 
3041     return result;
3042 }
3043 
3044 /*
3045  * ets:all/0
3046  *
3047  * ets:all() calls ets:internal_request_all/0 which
3048  * requests information about all tables from
3049  * each scheduler thread. Each scheduler replies
3050  * to the calling process with information about
3051  * existing tables created on that specific scheduler.
3052  */
3053 
3054 struct ErtsEtsAllReq_ {
3055     erts_atomic32_t refc;
3056     Process *proc;
3057     ErtsOIRefStorage ref;
3058     ErtsEtsAllReqList list[1]; /* one per scheduler */
3059 };
3060 
3061 #define ERTS_ETS_ALL_REQ_SIZE           \
3062     (sizeof(ErtsEtsAllReq)              \
3063      + (sizeof(ErtsEtsAllReqList)       \
3064         * (erts_no_schedulers - 1)))
3065 
3066 typedef struct {
3067     ErtsEtsAllReq *ongoing;
3068     ErlHeapFragment *hfrag;
3069     DbTable *tab;
3070     ErtsEtsAllReq *queue;
3071 } ErtsEtsAllData;
3072 
3073 /* Tables handled before yielding */
3074 #define ERTS_ETS_ALL_TB_YCNT 200
3075 /*
3076  * Min yield count required before starting
3077  * an operation that will require yield.
3078  */
3079 #define ERTS_ETS_ALL_TB_YCNT_START 10
3080 
3081 #ifdef DEBUG
3082 /* Test yielding... */
3083 #undef ERTS_ETS_ALL_TB_YCNT
3084 #undef ERTS_ETS_ALL_TB_YCNT_START
3085 #define ERTS_ETS_ALL_TB_YCNT 10
3086 #define ERTS_ETS_ALL_TB_YCNT_START 1
3087 #endif
3088 
3089 static int
ets_all_reply(ErtsSchedulerData * esdp,ErtsEtsAllReq ** reqpp,ErlHeapFragment ** hfragpp,DbTable ** tablepp,int * yield_count_p)3090 ets_all_reply(ErtsSchedulerData *esdp, ErtsEtsAllReq **reqpp,
3091               ErlHeapFragment **hfragpp, DbTable **tablepp,
3092               int *yield_count_p)
3093 {
3094     ErtsEtsAllReq *reqp = *reqpp;
3095     ErlHeapFragment *hfragp = *hfragpp;
3096     int ycount = *yield_count_p;
3097     DbTable *tb, *first;
3098     Uint sz;
3099     Eterm list, msg, ref, *hp;
3100     ErlOffHeap *ohp;
3101     ErtsMessage *mp;
3102 
3103     /*
3104      * - save_sched_table() inserts at end of circular list.
3105      *
3106      * - This function scans from the end so we know that
3107      *   the amount of tables to scan wont grow even if we
3108      *   yield.
3109      *
3110      * - remove_sched_table() updates the table we yielded
3111      *   on if it removes it.
3112      */
3113 
3114     if (hfragp) {
3115         /* Restart of a yielded operation... */
3116         ASSERT(hfragp->used_size < hfragp->alloc_size);
3117         ohp = &hfragp->off_heap;
3118         hp = &hfragp->mem[hfragp->used_size];
3119         list = *hp;
3120         hfragp->used_size = hfragp->alloc_size;
3121         first = esdp->ets_tables.clist;
3122         tb = *tablepp;
3123     }
3124     else {
3125         /* A new operation... */
3126         ASSERT(!*tablepp);
3127 
3128         /* Max heap size needed... */
3129         sz = erts_atomic_read_nob(&esdp->ets_tables.count);
3130         sz *= ERTS_MAGIC_REF_THING_SIZE + 2;
3131         sz += 3 + ERTS_REF_THING_SIZE;
3132         hfragp = new_message_buffer(sz);
3133 
3134         hp = &hfragp->mem[0];
3135         ohp = &hfragp->off_heap;
3136         list = NIL;
3137         first = esdp->ets_tables.clist;
3138         tb = first ? first->common.all.prev : NULL;
3139     }
3140 
3141     if (tb) {
3142         while (1) {
3143             if (is_table_alive(tb)) {
3144                 Eterm tid;
3145                 if (is_table_named(tb))
3146                     tid = tb->common.the_name;
3147                 else
3148                     tid = erts_mk_magic_ref(&hp, ohp, tb->common.btid);
3149                 list = CONS(hp, tid, list);
3150                 hp += 2;
3151             }
3152 
3153             if (tb == first)
3154                 break;
3155 
3156             tb = tb->common.all.prev;
3157 
3158             if (--ycount <= 0) {
3159                 sz = hp - &hfragp->mem[0];
3160                 ASSERT(hfragp->alloc_size > sz + 1);
3161                 *hp = list;
3162                 hfragp->used_size = sz;
3163                 *hfragpp = hfragp;
3164                 *reqpp = reqp;
3165                 *tablepp = tb;
3166                 *yield_count_p = 0;
3167                 return 1; /* Yield! */
3168             }
3169         }
3170     }
3171 
3172     ref = erts_oiref_storage_make_ref(&reqp->ref, &hp);
3173     msg = TUPLE2(hp, ref, list);
3174     hp += 3;
3175 
3176     sz = hp - &hfragp->mem[0];
3177     ASSERT(sz <= hfragp->alloc_size);
3178 
3179     hfragp = erts_resize_message_buffer(hfragp, sz, &msg, 1);
3180 
3181     mp = erts_alloc_message(0, NULL);
3182     mp->data.heap_frag = hfragp;
3183 
3184     erts_queue_message(reqp->proc, 0, mp, msg, am_system);
3185 
3186     erts_proc_dec_refc(reqp->proc);
3187 
3188     if (erts_atomic32_dec_read_nob(&reqp->refc) == 0)
3189         erts_free(ERTS_ALC_T_ETS_ALL_REQ, reqp);
3190 
3191     *reqpp = NULL;
3192     *hfragpp = NULL;
3193     *tablepp = NULL;
3194     *yield_count_p = ycount;
3195 
3196     return 0;
3197 }
3198 
3199 int
erts_handle_yielded_ets_all_request(ErtsSchedulerData * esdp,ErtsEtsAllYieldData * eaydp)3200 erts_handle_yielded_ets_all_request(ErtsSchedulerData *esdp,
3201                                     ErtsEtsAllYieldData *eaydp)
3202 {
3203     int ix = (int) esdp->no - 1;
3204     int yc = ERTS_ETS_ALL_TB_YCNT;
3205 
3206     while (1) {
3207         if (!eaydp->ongoing) {
3208             ErtsEtsAllReq *ongoing;
3209 
3210             if (!eaydp->queue)
3211                 return 0; /* All work completed! */
3212 
3213             if (yc < ERTS_ETS_ALL_TB_YCNT_START &&
3214                 yc > erts_atomic_read_nob(&esdp->ets_tables.count))
3215                 return 1; /* Yield! */
3216 
3217             eaydp->ongoing = ongoing = eaydp->queue;
3218             if (ongoing->list[ix].next == ongoing)
3219                 eaydp->queue = NULL;
3220             else {
3221                 ongoing->list[ix].next->list[ix].prev = ongoing->list[ix].prev;
3222                 ongoing->list[ix].prev->list[ix].next = ongoing->list[ix].next;
3223                 eaydp->queue = ongoing->list[ix].next;
3224             }
3225             ASSERT(!eaydp->hfrag);
3226             ASSERT(!eaydp->tab);
3227         }
3228 
3229         if (ets_all_reply(esdp, &eaydp->ongoing, &eaydp->hfrag, &eaydp->tab, &yc))
3230             return 1; /* Yield! */
3231     }
3232 }
3233 
3234 static void
handle_ets_all_request(void * vreq)3235 handle_ets_all_request(void *vreq)
3236 {
3237     ErtsSchedulerData *esdp = erts_get_scheduler_data();
3238     ErtsEtsAllYieldData *eayp = ERTS_SCHED_AUX_YIELD_DATA(esdp, ets_all);
3239     ErtsEtsAllReq *req = (ErtsEtsAllReq *) vreq;
3240 
3241     if (!eayp->ongoing && !eayp->queue) {
3242         /* No ets:all() operations ongoing... */
3243         ErlHeapFragment *hf = NULL;
3244         DbTable *tb = NULL;
3245         int yc = ERTS_ETS_ALL_TB_YCNT;
3246         if (ets_all_reply(esdp, &req, &hf, &tb, &yc)) {
3247             /* Yielded... */
3248             ASSERT(hf);
3249             eayp->ongoing = req;
3250             eayp->hfrag = hf;
3251             eayp->tab = tb;
3252             erts_notify_new_aux_yield_work(esdp);
3253         }
3254     }
3255     else {
3256         /* Ongoing ets:all() operations; queue up this request... */
3257         int ix = (int) esdp->no - 1;
3258         if (!eayp->queue) {
3259             req->list[ix].next = req;
3260             req->list[ix].prev = req;
3261             eayp->queue = req;
3262         }
3263         else {
3264             req->list[ix].next = eayp->queue;
3265             req->list[ix].prev = eayp->queue->list[ix].prev;
3266             eayp->queue->list[ix].prev = req;
3267             req->list[ix].prev->list[ix].next = req;
3268         }
3269     }
3270 }
3271 
ets_internal_request_all_0(BIF_ALIST_0)3272 BIF_RETTYPE ets_internal_request_all_0(BIF_ALIST_0)
3273 {
3274     Eterm ref = erts_make_ref(BIF_P);
3275     ErtsEtsAllReq *req = erts_alloc(ERTS_ALC_T_ETS_ALL_REQ,
3276                                     ERTS_ETS_ALL_REQ_SIZE);
3277     erts_atomic32_init_nob(&req->refc,
3278 			       (erts_aint32_t) erts_no_schedulers);
3279     erts_oiref_storage_save(&req->ref, ref);
3280     req->proc = BIF_P;
3281     erts_proc_add_refc(BIF_P, (Sint) erts_no_schedulers);
3282 
3283     if (erts_no_schedulers > 1)
3284 	erts_schedule_multi_misc_aux_work(1,
3285 					  erts_no_schedulers,
3286                                           handle_ets_all_request,
3287 					  (void *) req);
3288 
3289     handle_ets_all_request((void *) req);
3290     BIF_RET(ref);
3291 }
3292 
3293 /*
3294 ** db_slot(Db, Slot) -> [Items].
3295 */
ets_slot_2(BIF_ALIST_2)3296 BIF_RETTYPE ets_slot_2(BIF_ALIST_2)
3297 {
3298     DbTable* tb;
3299     int cret;
3300     Eterm ret;
3301 
3302     CHECK_TABLES();
3303 
3304     DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_slot_2);
3305 
3306     /* The slot number is checked in table specific code. */
3307     cret = tb->common.meth->db_slot(BIF_P, tb, BIF_ARG_2, &ret);
3308     db_unlock(tb, LCK_READ);
3309     switch (cret) {
3310     case DB_ERROR_NONE:
3311 	BIF_RET(ret);
3312     case DB_ERROR_SYSRES:
3313 	BIF_ERROR(BIF_P, SYSTEM_LIMIT);
3314     default:
3315 	BIF_ERROR(BIF_P, BADARG);
3316     }
3317 }
3318 
3319 /*
3320 ** The match BIF,  called as ets:match(Table, Pattern), ets:match(Continuation) or ets:match(Table,Pattern,ChunkSize).
3321 */
3322 
ets_match_1(BIF_ALIST_1)3323 BIF_RETTYPE ets_match_1(BIF_ALIST_1)
3324 {
3325     return ets_select1(BIF_P, BIF_ets_match_1, BIF_ARG_1);
3326 }
3327 
ets_match_2(BIF_ALIST_2)3328 BIF_RETTYPE ets_match_2(BIF_ALIST_2)
3329 {
3330     DbTable* tb;
3331     Eterm ms;
3332     DeclareTmpHeap(buff,8,BIF_P);
3333     Eterm *hp = buff;
3334     Eterm res;
3335 
3336     DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_match_2);
3337 
3338     UseTmpHeap(8,BIF_P);
3339     ms = CONS(hp, am_DollarDollar, NIL);
3340     hp += 2;
3341     ms = TUPLE3(hp, BIF_ARG_2, NIL, ms);
3342     hp += 4;
3343     ms = CONS(hp, ms, NIL);
3344     res = ets_select2(BIF_P, tb, BIF_ARG_1, ms);
3345     UnUseTmpHeap(8,BIF_P);
3346     return res;
3347 }
3348 
ets_match_3(BIF_ALIST_3)3349 BIF_RETTYPE ets_match_3(BIF_ALIST_3)
3350 {
3351     DbTable* tb;
3352     Eterm ms;
3353     Sint chunk_size;
3354     DeclareTmpHeap(buff,8,BIF_P);
3355     Eterm *hp = buff;
3356     Eterm res;
3357 
3358     DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_match_3);
3359 
3360     /* Chunk size strictly greater than 0 */
3361     if (is_not_small(BIF_ARG_3) || (chunk_size = signed_val(BIF_ARG_3)) <= 0) {
3362         db_unlock(tb, LCK_READ);
3363         BIF_ERROR(BIF_P, BADARG);
3364     }
3365 
3366     UseTmpHeap(8,BIF_P);
3367     ms = CONS(hp, am_DollarDollar, NIL);
3368     hp += 2;
3369     ms = TUPLE3(hp, BIF_ARG_2, NIL, ms);
3370     hp += 4;
3371     ms = CONS(hp, ms, NIL);
3372     res = ets_select3(BIF_P, tb, BIF_ARG_1, ms, chunk_size);
3373     UnUseTmpHeap(8,BIF_P);
3374     return res;
3375 }
3376 
3377 
ets_select_3(BIF_ALIST_3)3378 BIF_RETTYPE ets_select_3(BIF_ALIST_3)
3379 {
3380     DbTable* tb;
3381     Sint chunk_size;
3382 
3383     DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_select_3);
3384 
3385     /* Chunk size strictly greater than 0 */
3386     if (is_not_small(BIF_ARG_3) || (chunk_size = signed_val(BIF_ARG_3)) <= 0) {
3387         db_unlock(tb, LCK_READ);
3388         BIF_ERROR(BIF_P, BADARG);
3389     }
3390 
3391     return ets_select3(BIF_P, tb, BIF_ARG_1, BIF_ARG_2, chunk_size);
3392 }
3393 
3394 static BIF_RETTYPE
ets_select3(Process * p,DbTable * tb,Eterm tid,Eterm ms,Sint chunk_size)3395 ets_select3(Process* p, DbTable* tb, Eterm tid, Eterm ms, Sint chunk_size)
3396 {
3397     BIF_RETTYPE result;
3398     int cret;
3399     Eterm ret;
3400     enum DbIterSafety safety;
3401 
3402     CHECK_TABLES();
3403 
3404     safety = ITERATION_SAFETY(p,tb);
3405     if (safety == ITER_UNSAFE) {
3406 	local_fix_table(tb);
3407     }
3408     cret = tb->common.meth->db_select_chunk(p, tb, tid,
3409 					    ms, chunk_size,
3410 					    0 /* not reversed */,
3411 					    &ret, safety);
3412     if (DID_TRAP(p,ret) && safety != ITER_SAFE) {
3413 	fix_table_locked(p, tb);
3414     }
3415     if (safety == ITER_UNSAFE) {
3416 	local_unfix_table(tb);
3417     }
3418     db_unlock(tb, LCK_READ);
3419 
3420     switch (cret) {
3421     case DB_ERROR_NONE:
3422 	ERTS_BIF_PREP_RET(result, ret);
3423 	break;
3424     case DB_ERROR_SYSRES:
3425 	ERTS_BIF_PREP_ERROR(result, p, SYSTEM_LIMIT);
3426 	break;
3427     default:
3428 	ERTS_BIF_PREP_ERROR(result, p, BADARG);
3429 	break;
3430     }
3431 
3432     erts_match_set_release_result(p);
3433 
3434     return result;
3435 }
3436 
3437 
3438 /* Trap here from: ets_select_1/2/3
3439  */
ets_select_trap_1(BIF_ALIST_1)3440 static BIF_RETTYPE ets_select_trap_1(BIF_ALIST_1)
3441 {
3442     Process *p = BIF_P;
3443     Eterm a1 = BIF_ARG_1;
3444     BIF_RETTYPE result;
3445     DbTable* tb;
3446     int cret;
3447     Eterm ret;
3448     Eterm *tptr;
3449     db_lock_kind_t kind = LCK_READ;
3450     enum DbIterSafety safety = ITER_SAFE;
3451 
3452     CHECK_TABLES();
3453 
3454     tptr = tuple_val(a1);
3455     ASSERT(arityval(*tptr) >= 1);
3456 
3457     DB_TRAP_GET_TABLE(tb, tptr[1], DB_READ, kind,
3458                       &ets_select_continue_exp);
3459 
3460     cret = tb->common.meth->db_select_continue(p, tb, a1, &ret, &safety);
3461 
3462     if (!DID_TRAP(p,ret)) {
3463         if (safety != ITER_SAFE) {
3464             ASSERT(erts_refc_read(&tb->common.fix_count,1));
3465             unfix_table_locked(p, tb, &kind);
3466         }
3467     }
3468     db_unlock(tb, kind);
3469 
3470     switch (cret) {
3471     case DB_ERROR_NONE:
3472 	ERTS_BIF_PREP_RET(result, ret);
3473 	break;
3474     case DB_ERROR_SYSRES:
3475 	ERTS_BIF_PREP_ERROR(result, p, SYSTEM_LIMIT);
3476 	break;
3477     default:
3478 	ERTS_BIF_PREP_ERROR(result, p, BADARG);
3479 	break;
3480     }
3481 
3482     erts_match_set_release_result(p);
3483 
3484     return result;
3485 }
3486 
3487 
ets_select_1(BIF_ALIST_1)3488 BIF_RETTYPE ets_select_1(BIF_ALIST_1)
3489 {
3490     return ets_select1(BIF_P, BIF_ets_select_1, BIF_ARG_1);
3491     /* TRAP: ets_select_trap_1 */
3492 }
3493 
3494 /*
3495  * Common impl for select/1, select_reverse/1, match/1 and match_object/1
3496  */
ets_select1(Process * p,int bif_ix,Eterm arg1)3497 static BIF_RETTYPE ets_select1(Process *p, int bif_ix, Eterm arg1)
3498 {
3499     BIF_RETTYPE result;
3500     DbTable* tb;
3501     int cret;
3502     Eterm ret;
3503     Eterm *tptr;
3504     enum DbIterSafety safety, safety_copy;
3505 
3506     CHECK_TABLES();
3507 
3508     /*
3509      * Make sure that the table exists.
3510      */
3511 
3512     if (!is_tuple(arg1)) {
3513 	if (arg1 == am_EOT) {
3514 	    BIF_RET(am_EOT);
3515 	}
3516 	BIF_ERROR(p, BADARG);
3517     }
3518     tptr = tuple_val(arg1);
3519     if (arityval(*tptr) < 1)
3520         BIF_ERROR(p, BADARG);
3521 
3522     DB_GET_TABLE(tb, tptr[1], DB_READ, LCK_READ, bif_ix, NULL, p);
3523 
3524     safety = ITERATION_SAFETY(p,tb);
3525     if (safety == ITER_UNSAFE) {
3526 	local_fix_table(tb);
3527     }
3528 
3529     safety_copy = safety;
3530     cret = tb->common.meth->db_select_continue(p,tb, arg1, &ret, &safety_copy);
3531 
3532     if (DID_TRAP(p,ret) && safety != ITER_SAFE) {
3533 	fix_table_locked(p, tb);
3534     }
3535     if (safety == ITER_UNSAFE) {
3536 	local_unfix_table(tb);
3537     }
3538     db_unlock(tb, LCK_READ);
3539 
3540     switch (cret) {
3541     case DB_ERROR_NONE:
3542 	ERTS_BIF_PREP_RET(result, ret);
3543 	break;
3544     case DB_ERROR_SYSRES:
3545 	ERTS_BIF_PREP_ERROR(result, p, SYSTEM_LIMIT);
3546 	break;
3547     default:
3548 	ERTS_BIF_PREP_ERROR(result, p, BADARG);
3549 	break;
3550     }
3551 
3552     erts_match_set_release_result(p);
3553 
3554     return result;
3555 }
3556 
ets_select_2(BIF_ALIST_2)3557 BIF_RETTYPE ets_select_2(BIF_ALIST_2)
3558 {
3559     DbTable* tb;
3560     DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_select_2);
3561     return ets_select2(BIF_P, tb, BIF_ARG_1, BIF_ARG_2);
3562     /* TRAP: ets_select_trap_1 */
3563 }
3564 
3565 static BIF_RETTYPE
ets_select2(Process * p,DbTable * tb,Eterm tid,Eterm ms)3566 ets_select2(Process* p, DbTable* tb, Eterm tid, Eterm ms)
3567 {
3568     BIF_RETTYPE result;
3569     int cret;
3570     enum DbIterSafety safety;
3571     Eterm ret;
3572 
3573     CHECK_TABLES();
3574 
3575     safety = ITERATION_SAFETY(p,tb);
3576     if (safety == ITER_UNSAFE) {
3577 	local_fix_table(tb);
3578     }
3579 
3580     cret = tb->common.meth->db_select(p, tb, tid, ms, 0, &ret, safety);
3581 
3582     if (DID_TRAP(p,ret) && safety != ITER_SAFE) {
3583 	fix_table_locked(p, tb);
3584     }
3585     if (safety == ITER_UNSAFE) {
3586 	local_unfix_table(tb);
3587     }
3588     db_unlock(tb, LCK_READ);
3589 
3590     switch (cret) {
3591     case DB_ERROR_NONE:
3592 	ERTS_BIF_PREP_RET(result, ret);
3593 	break;
3594     case DB_ERROR_SYSRES:
3595 	ERTS_BIF_PREP_ERROR(result, p, SYSTEM_LIMIT);
3596 	break;
3597     default:
3598 	ERTS_BIF_PREP_ERROR(result, p, BADARG);
3599 	break;
3600     }
3601 
3602     erts_match_set_release_result(p);
3603 
3604     return result;
3605 }
3606 
3607 /* We get here instead of in the real BIF when trapping */
ets_select_count_1(BIF_ALIST_1)3608 static BIF_RETTYPE ets_select_count_1(BIF_ALIST_1)
3609 {
3610     Process *p = BIF_P;
3611     Eterm a1 = BIF_ARG_1;
3612     BIF_RETTYPE result;
3613     DbTable* tb;
3614     int cret;
3615     Eterm ret;
3616     Eterm *tptr;
3617     db_lock_kind_t kind = LCK_READ;
3618     enum DbIterSafety safety = ITER_SAFE;
3619 
3620     CHECK_TABLES();
3621 
3622     tptr = tuple_val(a1);
3623     ASSERT(arityval(*tptr) >= 1);
3624 
3625     DB_TRAP_GET_TABLE(tb, tptr[1], DB_READ, kind,
3626                       &ets_select_count_continue_exp);
3627 
3628     cret = tb->common.meth->db_select_count_continue(p, tb, a1, &ret, &safety);
3629 
3630     if (!DID_TRAP(p,ret) && safety != ITER_SAFE) {
3631         ASSERT(erts_refc_read(&tb->common.fix_count,1));
3632 	unfix_table_locked(p, tb, &kind);
3633     }
3634     db_unlock(tb, kind);
3635 
3636     switch (cret) {
3637     case DB_ERROR_NONE:
3638 	ERTS_BIF_PREP_RET(result, ret);
3639 	break;
3640     case DB_ERROR_SYSRES:
3641 	ERTS_BIF_PREP_ERROR(result, p, SYSTEM_LIMIT);
3642 	break;
3643     default:
3644 	ERTS_BIF_PREP_ERROR(result, p, BADARG);
3645 	break;
3646     }
3647 
3648     erts_match_set_release_result(p);
3649 
3650     return result;
3651 }
3652 
ets_select_count_2(BIF_ALIST_2)3653 BIF_RETTYPE ets_select_count_2(BIF_ALIST_2)
3654 {
3655     BIF_RETTYPE result;
3656     DbTable* tb;
3657     int cret;
3658     enum DbIterSafety safety;
3659     Eterm ret;
3660 
3661     CHECK_TABLES();
3662 
3663     DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_select_count_2);
3664 
3665     safety = ITERATION_SAFETY(BIF_P,tb);
3666     if (safety == ITER_UNSAFE) {
3667 	local_fix_table(tb);
3668     }
3669     cret = tb->common.meth->db_select_count(BIF_P,tb, BIF_ARG_1, BIF_ARG_2,
3670                                             &ret, safety);
3671 
3672     if (DID_TRAP(BIF_P,ret) && safety != ITER_SAFE) {
3673 	fix_table_locked(BIF_P, tb);
3674     }
3675     if (safety == ITER_UNSAFE) {
3676 	local_unfix_table(tb);
3677     }
3678     db_unlock(tb, LCK_READ);
3679     switch (cret) {
3680     case DB_ERROR_NONE:
3681 	ERTS_BIF_PREP_RET(result, ret);
3682 	break;
3683     case DB_ERROR_SYSRES:
3684 	ERTS_BIF_PREP_ERROR(result, BIF_P, SYSTEM_LIMIT);
3685 	break;
3686     default:
3687 	ERTS_BIF_PREP_ERROR(result, BIF_P, BADARG);
3688 	break;
3689     }
3690 
3691     erts_match_set_release_result(BIF_P);
3692 
3693     return result;
3694 }
3695 
3696 /*
3697  ** This is for trapping, cannot be called directly.
3698  */
ets_select_replace_1(BIF_ALIST_1)3699 static BIF_RETTYPE ets_select_replace_1(BIF_ALIST_1)
3700 {
3701     Process *p = BIF_P;
3702     Eterm a1 = BIF_ARG_1;
3703     BIF_RETTYPE result;
3704     DbTable* tb;
3705     int cret;
3706     Eterm ret;
3707     Eterm *tptr;
3708     db_lock_kind_t kind = LCK_WRITE_REC;
3709     enum DbIterSafety safety = ITER_SAFE;
3710 
3711     CHECK_TABLES();
3712     ASSERT(is_tuple(a1));
3713     tptr = tuple_val(a1);
3714     ASSERT(arityval(*tptr) >= 1);
3715 
3716     DB_TRAP_GET_TABLE(tb, tptr[1], DB_WRITE, kind,
3717                       &ets_select_replace_continue_exp);
3718 
3719     cret = tb->common.meth->db_select_replace_continue(p,tb,a1,&ret,&safety);
3720 
3721     if(!DID_TRAP(p,ret) && safety != ITER_SAFE) {
3722         ASSERT(erts_refc_read(&tb->common.fix_count,1));
3723         unfix_table_locked(p, tb, &kind);
3724     }
3725 
3726     db_unlock(tb, kind);
3727 
3728     switch (cret) {
3729     case DB_ERROR_NONE:
3730         ERTS_BIF_PREP_RET(result, ret);
3731         break;
3732     default:
3733         ERTS_BIF_PREP_ERROR(result, p, BADARG);
3734         break;
3735     }
3736     erts_match_set_release_result(p);
3737 
3738     return result;
3739 }
3740 
3741 
ets_select_replace_2(BIF_ALIST_2)3742 BIF_RETTYPE ets_select_replace_2(BIF_ALIST_2)
3743 {
3744     BIF_RETTYPE result;
3745     DbTable* tb;
3746     int cret;
3747     Eterm ret;
3748     enum DbIterSafety safety;
3749 
3750     CHECK_TABLES();
3751 
3752     DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE_REC, BIF_ets_select_replace_2);
3753 
3754     if (tb->common.status & DB_BAG) {
3755         /* Bag implementation presented both semantic consistency
3756            and performance issues */
3757         db_unlock(tb, LCK_WRITE_REC);
3758         BIF_P->fvalue = EXI_TAB_TYPE;
3759         BIF_ERROR(BIF_P, BADARG | EXF_HAS_EXT_INFO);
3760     }
3761 
3762     safety = ITERATION_SAFETY(BIF_P,tb);
3763     if (safety == ITER_UNSAFE) {
3764         local_fix_table(tb);
3765     }
3766     cret = tb->common.meth->db_select_replace(BIF_P, tb, BIF_ARG_1, BIF_ARG_2,
3767                                               &ret, safety);
3768 
3769     if (DID_TRAP(BIF_P,ret) && safety != ITER_SAFE) {
3770         fix_table_locked(BIF_P,tb);
3771     }
3772     if (safety == ITER_UNSAFE) {
3773         local_unfix_table(tb);
3774     }
3775     db_unlock(tb, LCK_WRITE_REC);
3776 
3777     switch (cret) {
3778     case DB_ERROR_NONE:
3779         ERTS_BIF_PREP_RET(result, ret);
3780         break;
3781     case DB_ERROR_SYSRES:
3782         ERTS_BIF_PREP_ERROR(result, BIF_P, SYSTEM_LIMIT);
3783         break;
3784     default:
3785         ERTS_BIF_PREP_ERROR(result, BIF_P, BADARG);
3786         break;
3787     }
3788 
3789     erts_match_set_release_result(BIF_P);
3790 
3791     return result;
3792 }
3793 
3794 
ets_select_reverse_3(BIF_ALIST_3)3795 BIF_RETTYPE ets_select_reverse_3(BIF_ALIST_3)
3796 {
3797     BIF_RETTYPE result;
3798     DbTable* tb;
3799     int cret;
3800     enum DbIterSafety safety;
3801     Eterm ret;
3802     Sint chunk_size;
3803 
3804     CHECK_TABLES();
3805 
3806     DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_select_reverse_3);
3807 
3808     /* Chunk size strictly greater than 0 */
3809     if (is_not_small(BIF_ARG_3) || (chunk_size = signed_val(BIF_ARG_3)) <= 0) {
3810 	db_unlock(tb, LCK_READ);
3811 	BIF_ERROR(BIF_P, BADARG);
3812     }
3813     safety = ITERATION_SAFETY(BIF_P,tb);
3814     if (safety == ITER_UNSAFE) {
3815 	local_fix_table(tb);
3816     }
3817     cret = tb->common.meth->db_select_chunk(BIF_P,tb, BIF_ARG_1,
3818 					    BIF_ARG_2, chunk_size,
3819 					    1 /* reversed */, &ret, safety);
3820     if (DID_TRAP(BIF_P,ret) && safety != ITER_SAFE) {
3821 	fix_table_locked(BIF_P, tb);
3822     }
3823     if (safety == ITER_UNSAFE) {
3824 	local_unfix_table(tb);
3825     }
3826     db_unlock(tb, LCK_READ);
3827     switch (cret) {
3828     case DB_ERROR_NONE:
3829 	ERTS_BIF_PREP_RET(result, ret);
3830 	break;
3831     case DB_ERROR_SYSRES:
3832 	ERTS_BIF_PREP_ERROR(result, BIF_P, SYSTEM_LIMIT);
3833 	break;
3834     default:
3835 	ERTS_BIF_PREP_ERROR(result, BIF_P, BADARG);
3836 	break;
3837     }
3838     erts_match_set_release_result(BIF_P);
3839     return result;
3840 }
3841 
ets_select_reverse_1(BIF_ALIST_1)3842 BIF_RETTYPE ets_select_reverse_1(BIF_ALIST_1)
3843 {
3844     return ets_select1(BIF_P, BIF_ets_select_reverse_1, BIF_ARG_1);
3845 }
3846 
ets_select_reverse_2(BIF_ALIST_2)3847 BIF_RETTYPE ets_select_reverse_2(BIF_ALIST_2)
3848 {
3849     BIF_RETTYPE result;
3850     DbTable* tb;
3851     int cret;
3852     enum DbIterSafety safety;
3853     Eterm ret;
3854 
3855     CHECK_TABLES();
3856 
3857     DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_select_reverse_2);
3858 
3859     safety = ITERATION_SAFETY(BIF_P,tb);
3860     if (safety == ITER_UNSAFE) {
3861 	local_fix_table(tb);
3862     }
3863     cret = tb->common.meth->db_select(BIF_P,tb, BIF_ARG_1, BIF_ARG_2,
3864 				      1 /*reversed*/, &ret, safety);
3865 
3866     if (DID_TRAP(BIF_P,ret) && safety != ITER_SAFE) {
3867 	fix_table_locked(BIF_P, tb);
3868     }
3869     if (safety == ITER_UNSAFE) {
3870 	local_unfix_table(tb);
3871     }
3872     db_unlock(tb, LCK_READ);
3873     switch (cret) {
3874     case DB_ERROR_NONE:
3875 	ERTS_BIF_PREP_RET(result, ret);
3876 	break;
3877     case DB_ERROR_SYSRES:
3878 	ERTS_BIF_PREP_ERROR(result, BIF_P, SYSTEM_LIMIT);
3879 	break;
3880     default:
3881 	ERTS_BIF_PREP_ERROR(result, BIF_P, BADARG);
3882 	break;
3883     }
3884     erts_match_set_release_result(BIF_P);
3885     return result;
3886 }
3887 
3888 
3889 /*
3890 ** ets:match_object(Continuation)
3891 */
ets_match_object_1(BIF_ALIST_1)3892 BIF_RETTYPE ets_match_object_1(BIF_ALIST_1)
3893 {
3894     return ets_select1(BIF_P, BIF_ets_match_object_1, BIF_ARG_1);
3895 }
3896 
3897 /*
3898 ** ets:match_object(Table, Pattern)
3899 */
ets_match_object_2(BIF_ALIST_2)3900 BIF_RETTYPE ets_match_object_2(BIF_ALIST_2)
3901 {
3902     DbTable* tb;
3903     Eterm ms;
3904     DeclareTmpHeap(buff,8,BIF_P);
3905     Eterm *hp = buff;
3906     Eterm res;
3907 
3908     DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_match_object_2);
3909 
3910     UseTmpHeap(8,BIF_P);
3911     ms = CONS(hp, am_DollarUnderscore, NIL);
3912     hp += 2;
3913     ms = TUPLE3(hp, BIF_ARG_2, NIL, ms);
3914     hp += 4;
3915     ms = CONS(hp, ms, NIL);
3916     res = ets_select2(BIF_P, tb, BIF_ARG_1, ms);
3917     UnUseTmpHeap(8,BIF_P);
3918     return res;
3919 }
3920 
3921 /*
3922 ** ets:match_object(Table,Pattern,ChunkSize)
3923 */
ets_match_object_3(BIF_ALIST_3)3924 BIF_RETTYPE ets_match_object_3(BIF_ALIST_3)
3925 {
3926     DbTable* tb;
3927     Sint chunk_size;
3928     Eterm ms;
3929     DeclareTmpHeap(buff,8,BIF_P);
3930     Eterm *hp = buff;
3931     Eterm res;
3932 
3933     DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_match_object_3);
3934 
3935     /* Chunk size strictly greater than 0 */
3936     if (is_not_small(BIF_ARG_3) || (chunk_size = signed_val(BIF_ARG_3)) <= 0) {
3937         db_unlock(tb, LCK_READ);
3938         BIF_ERROR(BIF_P, BADARG);
3939     }
3940 
3941     UseTmpHeap(8,BIF_P);
3942     ms = CONS(hp, am_DollarUnderscore, NIL);
3943     hp += 2;
3944     ms = TUPLE3(hp, BIF_ARG_2, NIL, ms);
3945     hp += 4;
3946     ms = CONS(hp, ms, NIL);
3947     res = ets_select3(BIF_P, tb, BIF_ARG_1, ms, chunk_size);
3948     UnUseTmpHeap(8,BIF_P);
3949     return res;
3950 }
3951 
3952 /*
3953  * BIF to extract information about a particular table.
3954  */
3955 
ets_info_1(BIF_ALIST_1)3956 BIF_RETTYPE ets_info_1(BIF_ALIST_1)
3957 {
3958     static Eterm fields[] = {am_protection, am_keypos, am_type, am_named_table,
3959                              am_node, am_size, am_name, am_heir, am_owner, am_memory, am_compressed,
3960                              am_write_concurrency,
3961                              am_read_concurrency,
3962                              am_decentralized_counters,
3963                              am_id};
3964     Eterm results[sizeof(fields)/sizeof(Eterm)];
3965     DbTable* tb;
3966     Eterm res;
3967     int i;
3968     Eterm* hp;
3969     Uint freason;
3970     Sint size = -1;
3971     Sint memory = -1;
3972     Eterm table;
3973     int is_ctrs_read_result_set = 0;
3974     /*Process* rp = NULL;*/
3975     /* If/when we implement lockless private tables:
3976     Eterm owner;
3977     */
3978     if(is_tuple(BIF_ARG_1) &&
3979        is_tuple_arity(BIF_ARG_1, 2) &&
3980        erts_flxctr_is_snapshot_result(tuple_val(BIF_ARG_1)[1])) {
3981         Eterm counter_read_result  = tuple_val(BIF_ARG_1)[1];
3982         table = tuple_val(BIF_ARG_1)[2];
3983         size = erts_flxctr_get_snapshot_result_after_trap(counter_read_result,
3984                                                           ERTS_DB_TABLE_NITEMS_COUNTER_ID);
3985         memory = erts_flxctr_get_snapshot_result_after_trap(counter_read_result,
3986                                                             ERTS_DB_TABLE_MEM_COUNTER_ID);
3987         is_ctrs_read_result_set = 1;
3988     } else {
3989         table = BIF_ARG_1;
3990     }
3991     if ((tb = db_get_table(BIF_P, table, DB_INFO, LCK_READ, &freason)) == NULL) {
3992         if (BIF_P->fvalue == EXI_TYPE) {
3993             /* TRAP or invalid table identifier (not atom or magic reference). */
3994             return db_bif_fail(BIF_P, freason, BIF_ets_info_1, NULL);
3995         } else {
3996             /* The table no longer exists. */
3997             BIF_RET(am_undefined);
3998         }
3999     }
4000 
4001     /* If/when we implement lockless private tables:
4002     owner = tb->common.owner;
4003     */
4004 
4005     /* If/when we implement lockless private tables:
4006     if ((tb->common.status & DB_PRIVATE) && owner != BIF_P->common.id) {
4007 	db_unlock(tb, LCK_READ);
4008 	rp = erts_pid2proc_not_running(BIF_P, ERTS_PROC_LOCK_MAIN,
4009 				       owner, ERTS_PROC_LOCK_MAIN);
4010 	if (rp == NULL) {
4011 	    BIF_RET(am_undefined);
4012 	}
4013 	if (rp == ERTS_PROC_LOCK_BUSY) {
4014 	    ERTS_BIF_YIELD1(BIF_TRAP_EXPORT(BIF_ets_info_1), BIF_P, BIF_ARG_1);
4015 	}
4016 	if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_INFO, LCK_READ)) == NULL
4017 	    || tb->common.owner != owner) {
4018 	    if (BIF_P != rp)
4019 		erts_proc_unlock(rp, ERTS_PROC_LOCK_MAIN);
4020 	    if (is_atom(BIF_ARG_1) || is_small(BIF_ARG_1)) {
4021 		BIF_RET(am_undefined);
4022 	    }
4023 	    BIF_ERROR(BIF_P, BADARG);
4024 	}
4025     }*/
4026 
4027     if (!is_ctrs_read_result_set) {
4028         ErtsFlxCtrSnapshotResult res =
4029             erts_flxctr_snapshot(&tb->common.counters, ERTS_ALC_T_ETS_CTRS, BIF_P);
4030         if (ERTS_FLXCTR_GET_RESULT_AFTER_TRAP == res.type) {
4031             Eterm tuple;
4032             db_unlock(tb, LCK_READ);
4033             hp = HAlloc(BIF_P, 3);
4034             tuple = TUPLE2(hp, res.trap_resume_state, table);
4035             BIF_TRAP1(BIF_TRAP_EXPORT(BIF_ets_info_1), BIF_P, tuple);
4036         } else if (res.type == ERTS_FLXCTR_TRY_AGAIN_AFTER_TRAP) {
4037             db_unlock(tb, LCK_READ);
4038             BIF_TRAP1(BIF_TRAP_EXPORT(BIF_ets_info_1), BIF_P, table);
4039         } else {
4040             size = res.result[ERTS_DB_TABLE_NITEMS_COUNTER_ID];
4041             memory = res.result[ERTS_DB_TABLE_MEM_COUNTER_ID];
4042             is_ctrs_read_result_set = 1;
4043         }
4044     }
4045     for (i = 0; i < sizeof(fields)/sizeof(Eterm); i++) {
4046         if (is_ctrs_read_result_set && am_size == fields[i]) {
4047             results[i] = erts_make_integer(size, BIF_P);
4048         } else if (is_ctrs_read_result_set && am_memory == fields[i]) {
4049             Sint words = (Sint) ((memory + sizeof(Sint) - 1) / sizeof(Sint));
4050             results[i] = erts_make_integer(words, BIF_P);
4051         } else {
4052             results[i] = table_info(BIF_P, tb, fields[i]);
4053             ASSERT(is_value(results[i]));
4054         }
4055     }
4056     db_unlock(tb, LCK_READ);
4057 
4058     /*if (rp != NULL && rp != BIF_P)
4059 	erts_proc_unlock(rp, ERTS_PROC_LOCK_MAIN);*/
4060 
4061     hp = HAlloc(BIF_P, 5*sizeof(fields)/sizeof(Eterm));
4062     res = NIL;
4063     for (i = 0; i < sizeof(fields)/sizeof(Eterm); i++) {
4064 	Eterm tuple;
4065 	tuple = TUPLE2(hp, fields[i], results[i]);
4066 	hp += 3;
4067 	res = CONS(hp, tuple, res);
4068 	hp += 2;
4069     }
4070     BIF_RET(res);
4071 }
4072 
4073 /*
4074  * BIF to extract information about a particular table.
4075  */
4076 
ets_info_2(BIF_ALIST_2)4077 BIF_RETTYPE ets_info_2(BIF_ALIST_2)
4078 {
4079     DbTable* tb;
4080     Eterm ret = THE_NON_VALUE;
4081     Uint freason;
4082     if (erts_flxctr_is_snapshot_result(BIF_ARG_1)) {
4083         Sint res;
4084         if (am_memory == BIF_ARG_2) {
4085             res = erts_flxctr_get_snapshot_result_after_trap(BIF_ARG_1,
4086                                                              ERTS_DB_TABLE_MEM_COUNTER_ID);
4087             res = (Sint) ((res + sizeof(Sint) - 1) / sizeof(Sint));
4088         } else {
4089             res = erts_flxctr_get_snapshot_result_after_trap(BIF_ARG_1,
4090                                                              ERTS_DB_TABLE_NITEMS_COUNTER_ID);
4091         }
4092         BIF_RET(erts_make_integer(res, BIF_P));
4093     }
4094 
4095     if (BIF_ARG_2 == am_binary)
4096         BIF_TRAP1(ets_info_binary_trap, BIF_P, BIF_ARG_1);
4097 
4098     if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_INFO, LCK_READ, &freason)) == NULL) {
4099         if (BIF_P->fvalue == EXI_TYPE) {
4100             /* TRAP or invalid table identifier (not atom or magic reference). */
4101             return db_bif_fail(BIF_P, freason, BIF_ets_info_2, NULL);
4102         } else {
4103             /* The table no longer exists. */
4104             BIF_RET(am_undefined);
4105         }
4106     }
4107     if (BIF_ARG_2 == am_size || BIF_ARG_2 == am_memory) {
4108         ErtsFlxCtrSnapshotResult res =
4109             erts_flxctr_snapshot(&tb->common.counters, ERTS_ALC_T_ETS_CTRS, BIF_P);
4110         if (ERTS_FLXCTR_GET_RESULT_AFTER_TRAP == res.type) {
4111             db_unlock(tb, LCK_READ);
4112             BIF_TRAP2(BIF_TRAP_EXPORT(BIF_ets_info_2), BIF_P, res.trap_resume_state, BIF_ARG_2);
4113         } else if (res.type == ERTS_FLXCTR_TRY_AGAIN_AFTER_TRAP) {
4114             db_unlock(tb, LCK_READ);
4115             BIF_TRAP2(BIF_TRAP_EXPORT(BIF_ets_info_2), BIF_P, BIF_ARG_1, BIF_ARG_2);
4116         } else if (BIF_ARG_2 == am_size) {
4117             ret = erts_make_integer(res.result[ERTS_DB_TABLE_NITEMS_COUNTER_ID], BIF_P);
4118         } else { /* BIF_ARG_2 == am_memory */
4119             Sint r = res.result[ERTS_DB_TABLE_MEM_COUNTER_ID];
4120             r = (Sint) ((r + sizeof(Sint) - 1) / sizeof(Sint));
4121             ret = erts_make_integer(r, BIF_P);
4122         }
4123     } else {
4124         ret = table_info(BIF_P, tb, BIF_ARG_2);
4125     }
4126     db_unlock(tb, LCK_READ);
4127     if (is_non_value(ret)) {
4128 	BIF_ERROR(BIF_P, BADARG);
4129     }
4130     BIF_RET(ret);
4131 }
4132 
4133 
ets_is_compiled_ms_1(BIF_ALIST_1)4134 BIF_RETTYPE ets_is_compiled_ms_1(BIF_ALIST_1)
4135 {
4136     if (erts_db_get_match_prog_binary(BIF_ARG_1)) {
4137 	BIF_RET(am_true);
4138     } else {
4139 	BIF_RET(am_false);
4140     }
4141 }
4142 
ets_match_spec_compile_1(BIF_ALIST_1)4143 BIF_RETTYPE ets_match_spec_compile_1(BIF_ALIST_1)
4144 {
4145     Uint freason;
4146     Binary *mp = db_match_set_compile(BIF_P, BIF_ARG_1, DCOMP_TABLE, &freason);
4147     Eterm *hp;
4148     if (mp == NULL) {
4149 	BIF_ERROR(BIF_P, freason);
4150     }
4151 
4152     hp = HAlloc(BIF_P, ERTS_MAGIC_REF_THING_SIZE);
4153 
4154     BIF_RET(erts_db_make_match_prog_ref(BIF_P, mp, &hp));
4155 }
4156 
ets_match_spec_run_r_3(BIF_ALIST_3)4157 BIF_RETTYPE ets_match_spec_run_r_3(BIF_ALIST_3)
4158 {
4159     Eterm ret = BIF_ARG_3;
4160     int i = 0;
4161     Eterm *hp;
4162     Eterm lst;
4163     Binary *mp;
4164     Eterm res;
4165     Uint32 dummy;
4166 
4167     if (!(is_list(BIF_ARG_1) || BIF_ARG_1 == NIL)) {
4168     error:
4169 	BIF_ERROR(BIF_P, BADARG);
4170     }
4171 
4172     mp = erts_db_get_match_prog_binary(BIF_ARG_2);
4173     if (!mp)
4174 	goto error;
4175 
4176     if (BIF_ARG_1 == NIL) {
4177 	BIF_RET(BIF_ARG_3);
4178     }
4179     for (lst = BIF_ARG_1; is_list(lst); lst = CDR(list_val(lst))) {
4180 	if (++i > CONTEXT_REDS) {
4181 	    BUMP_ALL_REDS(BIF_P);
4182 	    BIF_TRAP3(BIF_TRAP_EXPORT(BIF_ets_match_spec_run_r_3),
4183 		      BIF_P,lst,BIF_ARG_2,ret);
4184 	}
4185 	res = db_prog_match(BIF_P, BIF_P,
4186                             mp, CAR(list_val(lst)), NULL, 0,
4187 			    ERTS_PAM_COPY_RESULT, &dummy);
4188 	if (is_value(res)) {
4189 	    hp = HAlloc(BIF_P, 2);
4190 	    ret = CONS(hp,res,ret);
4191 	    /*hp += 2;*/
4192 	}
4193     }
4194     if (lst != NIL) {
4195 	goto error;
4196     }
4197     BIF_RET2(ret,i);
4198 }
4199 
erts_internal_ets_lookup_binary_info_2(BIF_ALIST_2)4200 BIF_RETTYPE erts_internal_ets_lookup_binary_info_2(BIF_ALIST_2)
4201 {
4202     DbTable* tb;
4203     int cret;
4204     Eterm ret;
4205 
4206     CHECK_TABLES();
4207 
4208     DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_erts_internal_ets_lookup_binary_info_2);
4209 
4210     cret = tb->common.meth->db_get_binary_info(BIF_P, tb, BIF_ARG_2, &ret);
4211 
4212     db_unlock(tb, LCK_READ);
4213 
4214     switch (cret) {
4215     case DB_ERROR_NONE:
4216 	BIF_RET(ret);
4217     case DB_ERROR_SYSRES:
4218 	BIF_ERROR(BIF_P, SYSTEM_LIMIT);
4219     default:
4220 	BIF_ERROR(BIF_P, BADARG);
4221     }
4222 }
4223 
erts_internal_ets_raw_first_1(BIF_ALIST_1)4224 BIF_RETTYPE erts_internal_ets_raw_first_1(BIF_ALIST_1)
4225 {
4226     DbTable* tb;
4227     int cret;
4228     Eterm ret;
4229 
4230     CHECK_TABLES();
4231 
4232     DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_erts_internal_ets_raw_first_1);
4233 
4234     cret = tb->common.meth->db_raw_first(BIF_P, tb, &ret);
4235 
4236     db_unlock(tb, LCK_READ);
4237 
4238     if (cret != DB_ERROR_NONE) {
4239 	BIF_ERROR(BIF_P, BADARG);
4240     }
4241     BIF_RET(ret);
4242 }
4243 
erts_internal_ets_raw_next_2(BIF_ALIST_2)4244 BIF_RETTYPE erts_internal_ets_raw_next_2(BIF_ALIST_2)
4245 {
4246     DbTable* tb;
4247     int cret;
4248     Eterm ret;
4249 
4250     CHECK_TABLES();
4251 
4252     DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_erts_internal_ets_raw_next_2);
4253 
4254     cret = tb->common.meth->db_raw_next(BIF_P, tb, BIF_ARG_2, &ret);
4255 
4256     db_unlock(tb, LCK_READ);
4257 
4258     if (cret != DB_ERROR_NONE) {
4259 	BIF_ERROR(BIF_P, BADARG);
4260     }
4261     BIF_RET(ret);
4262 }
4263 
4264 BIF_RETTYPE
erts_internal_ets_super_user_1(BIF_ALIST_1)4265 erts_internal_ets_super_user_1(BIF_ALIST_1)
4266 {
4267     if (BIF_ARG_1 == am_true)
4268         BIF_P->flags |= F_ETS_SUPER_USER;
4269     else if (BIF_ARG_1 == am_false)
4270         BIF_P->flags &= ~F_ETS_SUPER_USER;
4271     else
4272 	BIF_ERROR(BIF_P, BADARG);
4273     BIF_RET(am_ok);
4274 }
4275 
4276 /*
4277 ** External interface (NOT BIF's)
4278 */
4279 
4280 int erts_ets_rwmtx_spin_count = -1;
4281 
4282 /* Init the db */
4283 
init_db(ErtsDbSpinCount db_spin_count)4284 void init_db(ErtsDbSpinCount db_spin_count)
4285 {
4286     int i;
4287     unsigned bits;
4288     size_t size;
4289 
4290     int max_spin_count = (1 << 15) - 1; /* internal limit */
4291     erts_rwmtx_opt_t rwmtx_opt = ERTS_RWMTX_OPT_DEFAULT_INITER;
4292     rwmtx_opt.type = ERTS_RWMTX_TYPE_FREQUENT_READ;
4293     rwmtx_opt.lived = ERTS_RWMTX_LONG_LIVED;
4294 
4295     switch (db_spin_count) {
4296     case ERTS_DB_SPNCNT_NONE:
4297 	erts_ets_rwmtx_spin_count = 0;
4298 	break;
4299     case ERTS_DB_SPNCNT_VERY_LOW:
4300 	erts_ets_rwmtx_spin_count = 100;
4301 	break;
4302     case ERTS_DB_SPNCNT_LOW:
4303 	erts_ets_rwmtx_spin_count = 200;
4304 	erts_ets_rwmtx_spin_count += erts_no_schedulers * 50;
4305 	if (erts_ets_rwmtx_spin_count > 1000)
4306 	    erts_ets_rwmtx_spin_count = 1000;
4307 	break;
4308     case ERTS_DB_SPNCNT_HIGH:
4309 	erts_ets_rwmtx_spin_count = 2000;
4310 	erts_ets_rwmtx_spin_count += erts_no_schedulers * 100;
4311 	if (erts_ets_rwmtx_spin_count > 15000)
4312 	    erts_ets_rwmtx_spin_count = 15000;
4313 	break;
4314     case ERTS_DB_SPNCNT_VERY_HIGH:
4315 	erts_ets_rwmtx_spin_count = 15000;
4316 	erts_ets_rwmtx_spin_count += erts_no_schedulers * 500;
4317 	if (erts_ets_rwmtx_spin_count > max_spin_count)
4318 	    erts_ets_rwmtx_spin_count = max_spin_count;
4319 	break;
4320     case ERTS_DB_SPNCNT_EXTREMELY_HIGH:
4321 	erts_ets_rwmtx_spin_count = max_spin_count;
4322 	break;
4323     case ERTS_DB_SPNCNT_NORMAL:
4324     default:
4325 	erts_ets_rwmtx_spin_count = -1;
4326 	break;
4327     }
4328 
4329     if (erts_ets_rwmtx_spin_count >= 0)
4330 	rwmtx_opt.main_spincount = erts_ets_rwmtx_spin_count;
4331 
4332     for (i=0; i<META_NAME_TAB_LOCK_CNT; i++) {
4333         erts_rwmtx_init_opt(&meta_name_tab_rwlocks[i].lck, &rwmtx_opt,
4334             "meta_name_tab", make_small(i),
4335             ERTS_LOCK_FLAGS_PROPERTY_STATIC | ERTS_LOCK_FLAGS_CATEGORY_DB);
4336     }
4337 
4338     erts_atomic_init_nob(&erts_ets_misc_mem_size, 0);
4339     db_initialize_util();
4340 
4341     if (user_requested_db_max_tabs < DB_DEF_MAX_TABS)
4342 	db_max_tabs = DB_DEF_MAX_TABS;
4343     else
4344 	db_max_tabs = user_requested_db_max_tabs;
4345 
4346     bits = erts_fit_in_bits_int32(db_max_tabs-1);
4347     if (bits > SMALL_BITS) {
4348 	erts_exit(ERTS_ERROR_EXIT,"Max limit for ets tabled too high %u (max %u).",
4349 		 db_max_tabs, ((Uint)1)<<SMALL_BITS);
4350     }
4351 
4352     /*
4353      * We don't have ony hard limit for number of tables anymore,                                                                            .
4354      * but we use 'db_max_tabs' to determine size of name hash table.
4355      */
4356     meta_name_tab_mask = (((Uint) 1)<<bits) - 1;
4357     size = sizeof(struct meta_name_tab_entry)*(meta_name_tab_mask+1);
4358     meta_name_tab = erts_db_alloc_nt(ERTS_ALC_T_DB_TABLES, size);
4359     ERTS_ETS_MISC_MEM_ADD(size);
4360 
4361     for (i=0; i<=meta_name_tab_mask; i++) {
4362 	meta_name_tab[i].pu.tb = NULL;
4363 	meta_name_tab[i].u.name_atom = NIL;
4364     }
4365 
4366     db_initialize_hash();
4367     db_initialize_tree();
4368     db_initialize_catree();
4369 
4370     /* Non visual BIF to trap to. */
4371     erts_init_trap_export(&ets_select_delete_continue_exp,
4372 			  am_ets, ERTS_MAKE_AM("select_delete_trap"), 1,
4373 			  &ets_select_delete_trap_1);
4374 
4375     /* Non visual BIF to trap to. */
4376     erts_init_trap_export(&ets_select_count_continue_exp,
4377 			  am_ets, ERTS_MAKE_AM("count_trap"), 1,
4378 			  &ets_select_count_1);
4379 
4380     /* Non visual BIF to trap to. */
4381     erts_init_trap_export(&ets_select_replace_continue_exp,
4382                           am_ets, ERTS_MAKE_AM("replace_trap"), 1,
4383                           &ets_select_replace_1);
4384 
4385     /* Non visual BIF to trap to. */
4386     erts_init_trap_export(&ets_select_continue_exp,
4387 			  am_ets, ERTS_MAKE_AM("select_trap"), 1,
4388 			  &ets_select_trap_1);
4389 
4390     /* Non visual BIF to trap to. */
4391     erts_init_trap_export(&ets_delete_continue_exp,
4392 			  am_ets, ERTS_MAKE_AM("delete_trap"), 1,
4393 			  &ets_delete_trap);
4394 
4395     /* ets:info(Tab, binary) trap... */
4396 
4397     ets_info_binary_trap = erts_export_put(am_erts_internal,
4398                                            am_ets_info_binary,
4399                                            1);
4400 }
4401 
4402 void
erts_ets_sched_spec_data_init(ErtsSchedulerData * esdp)4403 erts_ets_sched_spec_data_init(ErtsSchedulerData *esdp)
4404 {
4405     ErtsEtsAllYieldData *eaydp = ERTS_SCHED_AUX_YIELD_DATA(esdp, ets_all);
4406     eaydp->ongoing = NULL;
4407     eaydp->hfrag = NULL;
4408     eaydp->tab = NULL;
4409     eaydp->queue = NULL;
4410     esdp->ets_tables.clist = NULL;
4411     erts_atomic_init_nob(&esdp->ets_tables.count, 0);
4412 }
4413 
4414 
4415 /* In: Table LCK_WRITE
4416 ** Return TRUE : ok, table not mine and NOT locked anymore.
4417 ** Return FALSE: failed, table still mine (LCK_WRITE)
4418 */
give_away_to_heir(Process * p,DbTable * tb)4419 static int give_away_to_heir(Process* p, DbTable* tb)
4420 {
4421     Process* to_proc;
4422     ErtsProcLocks to_locks = ERTS_PROC_LOCK_MAIN;
4423     Eterm to_pid;
4424     UWord heir_data;
4425 
4426     ASSERT(tb->common.owner == p->common.id);
4427     ASSERT(is_internal_pid(tb->common.heir));
4428     ASSERT(tb->common.heir != p->common.id);
4429 retry:
4430     to_pid = tb->common.heir;
4431     to_proc = erts_pid2proc_opt(p, ERTS_PROC_LOCK_MAIN,
4432 				to_pid, to_locks,
4433 				ERTS_P2P_FLG_TRY_LOCK);
4434     if (to_proc == ERTS_PROC_LOCK_BUSY) {
4435 	db_unlock(tb,LCK_WRITE);
4436 	to_proc = erts_pid2proc(p, ERTS_PROC_LOCK_MAIN,
4437 				to_pid, to_locks);
4438 	db_lock(tb,LCK_WRITE);
4439 	ASSERT(tb != NULL);
4440 
4441 	if (tb->common.owner != p->common.id) {
4442 	    if (to_proc != NULL ) {
4443 		erts_proc_unlock(to_proc, to_locks);
4444 	    }
4445 	    db_unlock(tb,LCK_WRITE);
4446 	    return !0; /* ok, someone already gave my table away */
4447 	}
4448 	if (tb->common.heir != to_pid) {  /* someone changed the heir */
4449 	    if (to_proc != NULL ) {
4450 		erts_proc_unlock(to_proc, to_locks);
4451 	    }
4452 	    if (to_pid == p->common.id || to_pid == am_none) {
4453 		return 0; /* no real heir, table still mine */
4454 	    }
4455 	    goto retry;
4456 	}
4457     }
4458     if (to_proc == NULL) {
4459 	return 0; /* heir not alive, table still mine */
4460     }
4461     if (to_proc->common.u.alive.started_interval
4462 	!= tb->common.heir_started_interval) {
4463 	erts_proc_unlock(to_proc, to_locks);
4464 	return 0; /* heir dead and pid reused, table still mine */
4465     }
4466 
4467     delete_owned_table(p, tb);
4468     to_proc->flags |= F_USING_DB;
4469     tb->common.owner = to_pid;
4470     save_owned_table(to_proc, tb);
4471 
4472     db_unlock(tb,LCK_WRITE);
4473     heir_data = tb->common.heir_data;
4474     if (!is_immed(heir_data)) {
4475 	Eterm* tpv = ((DbTerm*)heir_data)->tpl; /* tuple_val */
4476 	ASSERT(arityval(*tpv) == 1);
4477 	heir_data = tpv[1];
4478     }
4479     send_ets_transfer_message(p, to_proc, &to_locks, tb, heir_data);
4480     erts_proc_unlock(to_proc, to_locks);
4481     return !0;
4482 }
4483 
4484 static void
send_ets_transfer_message(Process * c_p,Process * proc,ErtsProcLocks * locks,DbTable * tb,Eterm heir_data)4485 send_ets_transfer_message(Process *c_p, Process *proc,
4486                           ErtsProcLocks *locks,
4487                           DbTable *tb, Eterm heir_data)
4488 {
4489     Uint hsz, hd_sz;
4490     ErtsMessage *mp;
4491     Eterm *hp;
4492     ErlOffHeap *ohp;
4493     Eterm tid, hd_copy, msg, sender;
4494 
4495     hsz = 5;
4496     if (!is_table_named(tb))
4497         hsz += ERTS_MAGIC_REF_THING_SIZE;
4498     if (is_immed(heir_data))
4499         hd_sz = 0;
4500     else {
4501         hd_sz = size_object(heir_data);
4502         hsz += hd_sz;
4503     }
4504 
4505     mp = erts_alloc_message_heap(proc, locks, hsz, &hp, &ohp);
4506     if (is_table_named(tb))
4507         tid = tb->common.the_name;
4508     else
4509         tid = erts_mk_magic_ref(&hp, ohp, tb->common.btid);
4510     if (!hd_sz)
4511         hd_copy = heir_data;
4512     else
4513         hd_copy = copy_struct(heir_data, hd_sz, &hp, ohp);
4514     sender = c_p->common.id;
4515     msg = TUPLE4(hp, am_ETS_TRANSFER, tid, sender, hd_copy);
4516     ERL_MESSAGE_TOKEN(mp) = am_undefined;
4517     erts_queue_proc_message(c_p, proc, *locks, mp, msg);
4518 }
4519 
4520 
4521 /* Auto-release fixation from exiting process */
proc_cleanup_fixed_table(Process * p,DbFixation * fix)4522 static SWord proc_cleanup_fixed_table(Process* p, DbFixation* fix)
4523 {
4524     DbTable* tb = btid2tab(fix->tabs.btid);
4525     SWord work = 0;
4526 
4527     ASSERT(fix->procs.p == p); (void)p;
4528     if (tb) {
4529 	db_lock(tb, LCK_WRITE_REC);
4530 	if (!(tb->common.status & DB_DELETE)) {
4531 	    erts_aint_t diff;
4532             int use_locks = !DB_LOCK_FREE(tb);
4533 
4534             if (use_locks)
4535                 erts_mtx_lock(&tb->common.fixlock);
4536 
4537 	    ASSERT(fixing_procs_rbt_lookup(tb->common.fixing_procs, p));
4538 
4539 	    diff = -((erts_aint_t) fix->counter);
4540 	    erts_refc_add(&tb->common.fix_count,diff,0);
4541 	    fix->counter = 0;
4542 
4543 	    fixing_procs_rbt_delete(&tb->common.fixing_procs, fix);
4544 
4545             if (use_locks)
4546                 erts_mtx_unlock(&tb->common.fixlock);
4547 
4548 	    if (!IS_FIXED(tb) && IS_HASH_TABLE(tb->common.status)) {
4549 		work += db_unfix_table_hash(&(tb->hash));
4550 	    }
4551 
4552 	    ASSERT(sizeof(DbFixation) == ERTS_ALC_DBG_BLK_SZ(fix));
4553 	    ERTS_DB_ALC_MEM_UPDATE_(tb, sizeof(DbFixation), 0);
4554 	}
4555 	db_unlock(tb, LCK_WRITE_REC);
4556     }
4557 
4558     erts_bin_release(fix->tabs.btid);
4559     erts_free(ERTS_ALC_T_DB_FIXATION, fix);
4560     ERTS_ETS_MISC_MEM_ADD(-sizeof(DbFixation));
4561     ++work;
4562 
4563     return work;
4564 }
4565 
4566 
4567 /*
4568  * erts_db_process_exiting() is called when a process terminates.
4569  * It returns 0 when completely done, and !0 when it wants to
4570  * yield. *yield_state can hold a pointer to a state while
4571  * yielding.
4572  */
4573 #define ERTS_DB_INTERNAL_ERROR(LSTR) \
4574   erts_exit(ERTS_ABORT_EXIT, "%s:%d:erts_db_process_exiting(): " LSTR "\n", \
4575 	   __FILE__, __LINE__)
4576 
4577 int
erts_db_process_exiting(Process * c_p,ErtsProcLocks c_p_locks,void ** yield_state)4578 erts_db_process_exiting(Process *c_p, ErtsProcLocks c_p_locks, void **yield_state)
4579 {
4580     typedef struct {
4581         enum {
4582             GET_OWNED_TABLE,
4583             FREE_OWNED_TABLE,
4584             UNFIX_TABLES,
4585         }op;
4586         DbTable *tb;
4587     } CleanupState;
4588     CleanupState *state = (CleanupState *) *yield_state;
4589     Eterm pid = c_p->common.id;
4590     CleanupState default_state;
4591     SWord initial_reds = ERTS_BIF_REDS_LEFT(c_p);
4592     SWord reds = initial_reds;
4593 
4594     if (!state) {
4595 	state = &default_state;
4596 	state->op = GET_OWNED_TABLE;
4597         state->tb = NULL;
4598     }
4599 
4600     do {
4601 	switch (state->op) {
4602         case GET_OWNED_TABLE: {
4603             DbTable* tb;
4604             erts_proc_lock(c_p, ERTS_PROC_LOCK_STATUS);
4605             tb = (DbTable*) erts_psd_get(c_p, ERTS_PSD_ETS_OWNED_TABLES);
4606             erts_proc_unlock(c_p, ERTS_PROC_LOCK_STATUS);
4607 
4608             if (!tb) {
4609                 /* Done with owned tables; now fixations */
4610                 state->op = UNFIX_TABLES;
4611                 break;
4612             }
4613 
4614             ASSERT(tb != state->tb);
4615             state->tb = tb;
4616             db_lock(tb, LCK_WRITE);
4617             /*
4618              *  Ownership may have changed since we looked up the table.
4619              */
4620             if (tb->common.owner != pid) {
4621                 db_unlock(tb, LCK_WRITE);
4622                 break;
4623             }
4624             if (tb->common.heir != am_none
4625                 && tb->common.heir != pid
4626                 && give_away_to_heir(c_p, tb)) {
4627                 break;
4628             }
4629             /* Clear all access bits. */
4630             tb->common.status &= ~(DB_PROTECTED | DB_PUBLIC | DB_PRIVATE);
4631             tb->common.status |= DB_DELETE;
4632 
4633             if (is_table_named(tb))
4634                 remove_named_tab(tb, 0);
4635 
4636             free_heir_data(tb);
4637             reds -= free_fixations_locked(c_p, tb);
4638             tid_clear(c_p, tb);
4639             db_unlock(tb, LCK_WRITE);
4640             state->op = FREE_OWNED_TABLE;
4641             break;
4642         }
4643         case FREE_OWNED_TABLE:
4644             reds = free_table_continue(c_p, state->tb, reds);
4645             if (reds < 0)
4646                 goto yield;
4647 
4648             state->op = GET_OWNED_TABLE;
4649             break;
4650 
4651 	case UNFIX_TABLES: {
4652 	    DbFixation* fix;
4653 
4654             fix = (DbFixation*) erts_psd_get(c_p, ERTS_PSD_ETS_FIXED_TABLES);
4655 
4656             if (!fix) {
4657                 /* Done */
4658 
4659                 if (state != &default_state)
4660                     erts_free(ERTS_ALC_T_DB_PROC_CLEANUP, state);
4661                 *yield_state = NULL;
4662 
4663                 BUMP_REDS(c_p, (initial_reds - reds));
4664                 return 0;
4665             }
4666 
4667             fixed_tabs_delete(c_p, fix);
4668             reds -= proc_cleanup_fixed_table(c_p, fix);
4669 
4670             break;
4671         }
4672 	default:
4673 	    ERTS_DB_INTERNAL_ERROR("Bad internal state");
4674         }
4675 
4676     } while (reds > 0);
4677 
4678  yield:
4679 
4680     if (state == &default_state) {
4681 	*yield_state = erts_alloc(ERTS_ALC_T_DB_PROC_CLEANUP,
4682                                   sizeof(CleanupState));
4683 	sys_memcpy(*yield_state, (void*) state, sizeof(CleanupState));
4684     }
4685     else
4686         ASSERT(state == *yield_state);
4687 
4688     return !0;
4689 }
4690 
4691 
4692 /*  SMP note: table only need to be LCK_READ locked */
fix_table_locked(Process * p,DbTable * tb)4693 static void fix_table_locked(Process* p, DbTable* tb)
4694 {
4695     DbFixation *fix;
4696     int use_locks = !DB_LOCK_FREE(tb);
4697 
4698     if (use_locks)
4699         erts_mtx_lock(&tb->common.fixlock);
4700 
4701     erts_refc_inc(&tb->common.fix_count,1);
4702     fix = tb->common.fixing_procs;
4703     if (fix == NULL) {
4704 	tb->common.time.monotonic
4705 	    = erts_get_monotonic_time(erts_proc_sched_data(p));
4706 	tb->common.time.offset = erts_get_time_offset();
4707     }
4708     else {
4709 	fix = fixing_procs_rbt_lookup(fix, p);
4710 	if (fix) {
4711 	    ASSERT(fixed_tabs_find(NULL, fix));
4712 	    ++(fix->counter);
4713             if (use_locks)
4714                 erts_mtx_unlock(&tb->common.fixlock);
4715 	    return;
4716 	}
4717     }
4718     fix = (DbFixation *) erts_db_alloc(ERTS_ALC_T_DB_FIXATION,
4719 				       tb, sizeof(DbFixation));
4720     ERTS_ETS_MISC_MEM_ADD(sizeof(DbFixation));
4721     fix->tabs.btid = tb->common.btid;
4722     erts_refc_inc(&fix->tabs.btid->intern.refc, 2);
4723     fix->procs.p = p;
4724     fix->counter = 1;
4725     fixing_procs_rbt_insert(&tb->common.fixing_procs, fix);
4726 
4727     if (use_locks)
4728         erts_mtx_unlock(&tb->common.fixlock);
4729 
4730     p->flags |= F_USING_DB;
4731 
4732     fixed_tabs_insert(p, fix);
4733 }
4734 
4735 /* SMP note: May re-lock table
4736 */
unfix_table_locked(Process * p,DbTable * tb,db_lock_kind_t * kind_p)4737 static void unfix_table_locked(Process* p,  DbTable* tb,
4738 			       db_lock_kind_t* kind_p)
4739 {
4740     DbFixation* fix;
4741     int use_locks = !DB_LOCK_FREE(tb);
4742 
4743     if (use_locks)
4744         erts_mtx_lock(&tb->common.fixlock);
4745 
4746     fix = fixing_procs_rbt_lookup(tb->common.fixing_procs, p);
4747 
4748     if (fix) {
4749 	erts_refc_dec(&tb->common.fix_count,0);
4750 	--(fix->counter);
4751 	ASSERT(fix->counter >= 0);
4752 	if (fix->counter == 0) {
4753 	    fixing_procs_rbt_delete(&tb->common.fixing_procs, fix);
4754             if (use_locks)
4755                 erts_mtx_unlock(&tb->common.fixlock);
4756 	    fixed_tabs_delete(p, fix);
4757 
4758 	    erts_refc_dec(&fix->tabs.btid->intern.refc, 1);
4759 
4760 	    erts_db_free(ERTS_ALC_T_DB_FIXATION,
4761 			 tb, (void *) fix, sizeof(DbFixation));
4762 	    ERTS_ETS_MISC_MEM_ADD(-sizeof(DbFixation));
4763 	    goto unlocked;
4764 	}
4765     }
4766     if (use_locks)
4767         erts_mtx_unlock(&tb->common.fixlock);
4768 unlocked:
4769 
4770     if (!IS_FIXED(tb) && IS_HASH_TABLE(tb->common.status)
4771 	&& erts_atomic_read_nob(&tb->hash.fixdel) != (erts_aint_t)NULL) {
4772 	if (*kind_p == LCK_READ && tb->common.is_thread_safe) {
4773 	    /* Must have write lock while purging pseudo-deleted (OTP-8166) */
4774             if (use_locks) {
4775                 erts_rwmtx_runlock(&tb->common.rwlock);
4776                 erts_rwmtx_rwlock(&tb->common.rwlock);
4777             }
4778 	    *kind_p = LCK_WRITE;
4779 	    if (tb->common.status & (DB_DELETE|DB_BUSY))
4780                 return;
4781 	}
4782 	db_unfix_table_hash(&(tb->hash));
4783     }
4784 }
4785 
4786 struct free_fixations_ctx
4787 {
4788     Process* p;
4789     DbTable* tb;
4790     SWord cnt;
4791 };
4792 
free_fixations_op(DbFixation * fix,void * vctx,Sint reds)4793 static int free_fixations_op(DbFixation* fix, void* vctx, Sint reds)
4794 {
4795     struct free_fixations_ctx* ctx = (struct free_fixations_ctx*) vctx;
4796     erts_aint_t diff;
4797 
4798     ASSERT(btid2tab(fix->tabs.btid) == ctx->tb);
4799     ASSERT(fix->counter > 0);
4800     ASSERT(ctx->tb->common.status & DB_DELETE);
4801 
4802     diff = -((erts_aint_t) fix->counter);
4803     erts_refc_add(&ctx->tb->common.fix_count, diff, 0);
4804 
4805     if (fix->procs.p != ctx->p) { /* Fixated by other process */
4806         fix->counter = 0;
4807 
4808         /* Fake memory stats for table */
4809         ASSERT(sizeof(DbFixation) == ERTS_ALC_DBG_BLK_SZ(fix));
4810         ERTS_DB_ALC_MEM_UPDATE_(ctx->tb, sizeof(DbFixation), 0);
4811 
4812         erts_schedule_ets_free_fixation(fix->procs.p->common.id, fix);
4813         /*
4814          * Either sys task is scheduled and erts_db_execute_free_fixation()
4815          * will remove 'fix' or process will exit, drop sys task and
4816          * proc_cleanup_fixed_table() will remove 'fix'.
4817          */
4818     }
4819     else
4820     {
4821         fixed_tabs_delete(fix->procs.p, fix);
4822 
4823         erts_bin_release(fix->tabs.btid);
4824 
4825         erts_db_free(ERTS_ALC_T_DB_FIXATION,
4826 		     ctx->tb, (void *) fix, sizeof(DbFixation));
4827         ERTS_ETS_MISC_MEM_ADD(-sizeof(DbFixation));
4828     }
4829     ctx->cnt++;
4830     return 1;
4831 }
4832 
erts_db_execute_free_fixation(Process * p,DbFixation * fix)4833 int erts_db_execute_free_fixation(Process* p, DbFixation* fix)
4834 {
4835     ASSERT(fix->counter == 0);
4836     fixed_tabs_delete(p, fix);
4837 
4838     erts_bin_release(fix->tabs.btid);
4839 
4840     erts_free(ERTS_ALC_T_DB_FIXATION, fix);
4841     ERTS_ETS_MISC_MEM_ADD(-sizeof(DbFixation));
4842     return 1;
4843 }
4844 
free_fixations_locked(Process * p,DbTable * tb)4845 static SWord free_fixations_locked(Process* p, DbTable *tb)
4846 {
4847     struct free_fixations_ctx ctx;
4848 
4849     ERTS_LC_ASSERT(DB_LOCK_FREE(tb) || erts_lc_rwmtx_is_rwlocked(&tb->common.rwlock));
4850 
4851     ctx.p = p;
4852     ctx.tb = tb;
4853     ctx.cnt = 0;
4854     fixing_procs_rbt_foreach_destroy(&tb->common.fixing_procs,
4855                                      free_fixations_op, &ctx);
4856     tb->common.fixing_procs = NULL;
4857     return ctx.cnt;
4858 }
4859 
set_heir(Process * me,DbTable * tb,Eterm heir,UWord heir_data)4860 static void set_heir(Process* me, DbTable* tb, Eterm heir, UWord heir_data)
4861 {
4862     tb->common.heir = heir;
4863     if (heir == am_none) {
4864 	return;
4865     }
4866     if (heir == me->common.id) {
4867 	erts_ensure_later_proc_interval(me->common.u.alive.started_interval);
4868 	tb->common.heir_started_interval = me->common.u.alive.started_interval;
4869     }
4870     else {
4871 	Process* heir_proc= erts_proc_lookup(heir);
4872 	if (heir_proc != NULL) {
4873 	    erts_ensure_later_proc_interval(heir_proc->common.u.alive.started_interval);
4874 	    tb->common.heir_started_interval = heir_proc->common.u.alive.started_interval;
4875 	} else {
4876 	    tb->common.heir = am_none;
4877 	}
4878     }
4879 
4880     if (!is_immed(heir_data)) {
4881 	DeclareTmpHeap(tmp,2,me);
4882 	Eterm wrap_tpl;
4883 	int size;
4884 	DbTerm* dbterm;
4885 	Eterm* top;
4886 	ErlOffHeap tmp_offheap;
4887 
4888 	UseTmpHeap(2,me);
4889 	/* Make a dummy 1-tuple around data to use DbTerm */
4890 	wrap_tpl = TUPLE1(tmp,heir_data);
4891 	size = size_object(wrap_tpl);
4892 	dbterm = erts_db_alloc(ERTS_ALC_T_DB_HEIR_DATA, (DbTable *)tb,
4893 			       (sizeof(DbTerm) + sizeof(Eterm)*(size-1)));
4894 	dbterm->size = size;
4895 	top = dbterm->tpl;
4896 	tmp_offheap.first  = NULL;
4897 	copy_struct(wrap_tpl, size, &top, &tmp_offheap);
4898 	dbterm->first_oh = tmp_offheap.first;
4899 	heir_data = (UWord)dbterm;
4900 	UnUseTmpHeap(2,me);
4901 	ASSERT(!is_immed(heir_data));
4902     }
4903     tb->common.heir_data = heir_data;
4904 }
4905 
free_heir_data(DbTable * tb)4906 static void free_heir_data(DbTable* tb)
4907 {
4908     if (tb->common.heir != am_none && !is_immed(tb->common.heir_data)) {
4909 	DbTerm* p = (DbTerm*) tb->common.heir_data;
4910 	db_cleanup_offheap_comp(p);
4911 	erts_db_free(ERTS_ALC_T_DB_HEIR_DATA, tb, (void *)p,
4912 		     sizeof(DbTerm) + (p->size-1)*sizeof(Eterm));
4913     }
4914     #ifdef DEBUG
4915     tb->common.heir_data = am_undefined;
4916     #endif
4917 }
4918 
ets_delete_trap(BIF_ALIST_1)4919 static BIF_RETTYPE ets_delete_trap(BIF_ALIST_1)
4920 {
4921     SWord initial_reds = ERTS_BIF_REDS_LEFT(BIF_P);
4922     SWord reds = initial_reds;
4923     Eterm cont = BIF_ARG_1;
4924     Eterm* ptr = big_val(cont);
4925     DbTable *tb = *((DbTable **) (UWord) (ptr + 1));
4926 
4927     ASSERT(*ptr == make_pos_bignum_header(1));
4928 
4929     reds = free_table_continue(BIF_P, tb, reds);
4930     if (reds < 0) {
4931         BUMP_ALL_REDS(BIF_P);
4932         BIF_TRAP1(&ets_delete_continue_exp, BIF_P, cont);
4933     }
4934     else {
4935         BUMP_REDS(BIF_P, (initial_reds - reds));
4936 	BIF_RET(am_true);
4937     }
4938 }
4939 
4940 
4941 /*
4942  * free_table_continue() returns reductions left
4943  * done if >= 0
4944  * yield if < 0
4945  */
free_table_continue(Process * p,DbTable * tb,SWord reds)4946 static SWord free_table_continue(Process *p, DbTable *tb, SWord reds)
4947 {
4948     reds = tb->common.meth->db_free_table_continue(tb, reds);
4949 
4950     if (reds < 0) {
4951 #ifdef HARDDEBUG
4952 	erts_fprintf(stderr,"ets: free_table_cont %T (continue begin)\r\n",
4953 		     tb->common.id);
4954 #endif
4955 	/* More work to be done. Let other processes work and call us again. */
4956     }
4957     else {
4958 #ifdef HARDDEBUG
4959 	erts_fprintf(stderr,"ets: free_table_cont %T (continue end)\r\n",
4960 		     tb->common.id);
4961 #endif
4962 	/* Completely done - we will not get called again. */
4963         delete_owned_table(p, tb);
4964         table_dec_refc(tb, 0);
4965     }
4966     return reds;
4967 }
4968 
4969 struct fixing_procs_info_ctx
4970 {
4971     Process* p;
4972     Eterm list;
4973 };
4974 
fixing_procs_info_op(DbFixation * fix,void * vctx,Sint reds)4975 static int fixing_procs_info_op(DbFixation* fix, void* vctx, Sint reds)
4976 {
4977     struct fixing_procs_info_ctx* ctx = (struct fixing_procs_info_ctx*) vctx;
4978     Eterm* hp;
4979     Eterm tpl;
4980 
4981     hp = HAllocX(ctx->p, 5, 100);
4982     tpl = TUPLE2(hp, fix->procs.p->common.id, make_small(fix->counter));
4983     hp += 3;
4984     ctx->list = CONS(hp, tpl, ctx->list);
4985     return 1;
4986 }
4987 
table_info(Process * p,DbTable * tb,Eterm What)4988 static Eterm table_info(Process* p, DbTable* tb, Eterm What)
4989 {
4990     Eterm ret = THE_NON_VALUE;
4991     int use_monotonic;
4992 
4993     if (What == am_size) {
4994         Uint size = (Uint) (DB_GET_APPROX_NITEMS(tb));
4995         ret = erts_make_integer(size, p);
4996     } else if (What == am_type) {
4997 	if (tb->common.status & DB_SET)  {
4998 	    ret = am_set;
4999 	} else if (tb->common.status & DB_DUPLICATE_BAG) {
5000 	    ret = am_duplicate_bag;
5001 	} else if (tb->common.status & DB_ORDERED_SET) {
5002 	    ret = am_ordered_set;
5003 	} else if (tb->common.status & DB_CA_ORDERED_SET) {
5004 	    ret = am_ordered_set;
5005 	} else { /*TT*/
5006 	    ASSERT(tb->common.status & DB_BAG);
5007 	    ret = am_bag;
5008 	}
5009     } else if (What == am_memory) {
5010 	Uint words = (Uint) ((DB_GET_APPROX_MEM_CONSUMED(tb)
5011 			      + sizeof(Uint)
5012 			      - 1)
5013 			     / sizeof(Uint));
5014 	ret = erts_make_integer(words, p);
5015     } else if (What == am_owner) {
5016 	ret = tb->common.owner;
5017     } else if (What == am_heir) {
5018 	ret = tb->common.heir;
5019     } else if (What == am_protection) {
5020 	if (tb->common.status & DB_PRIVATE)
5021 	    ret = am_private;
5022 	else if (tb->common.status & DB_PROTECTED)
5023 	    ret = am_protected;
5024 	else if (tb->common.status & DB_PUBLIC)
5025 	    ret = am_public;
5026     } else if (What == am_write_concurrency) {
5027         ret = tb->common.status & DB_FINE_LOCKED ? am_true : am_false;
5028     } else if (What == am_read_concurrency) {
5029         ret = tb->common.status & DB_FREQ_READ ? am_true : am_false;
5030     } else if (What == am_name) {
5031 	ret = tb->common.the_name;
5032     } else if (What == am_keypos) {
5033 	ret = make_small(tb->common.keypos);
5034     } else if (What == am_node) {
5035 	ret = erts_this_dist_entry->sysname;
5036     } else if (What == am_named_table) {
5037 	ret = is_table_named(tb) ? am_true : am_false;
5038     } else if (What == am_compressed) {
5039 	ret = tb->common.compress ? am_true : am_false;
5040     } else if (What == am_id) {
5041         ret = make_tid(p, tb);
5042     } else if (What == am_decentralized_counters) {
5043         ret = tb->common.counters.is_decentralized ? am_true : am_false;
5044     }
5045 
5046     /*
5047      * For debugging purposes
5048      */
5049     else if (What == am_data) {
5050 	print_table(ERTS_PRINT_STDOUT, NULL, 1, tb);
5051 	ret = am_true;
5052     } else if (ERTS_IS_ATOM_STR("fixed",What)) {
5053 	if (IS_FIXED(tb))
5054 	    ret = am_true;
5055 	else
5056 	    ret = am_false;
5057     } else if ((use_monotonic
5058 		= ERTS_IS_ATOM_STR("safe_fixed_monotonic_time",
5059 				   What))
5060 	       || ERTS_IS_ATOM_STR("safe_fixed", What)) {
5061         if (!DB_LOCK_FREE(tb))
5062             erts_mtx_lock(&tb->common.fixlock);
5063 	if (IS_FIXED(tb)) {
5064 	    Uint need;
5065 	    Eterm *hp;
5066 	    Eterm time;
5067 	    Sint64 mtime;
5068 	    struct fixing_procs_info_ctx ctx;
5069 
5070 	    need = 3;
5071 	    if (use_monotonic) {
5072 		mtime = (Sint64) tb->common.time.monotonic;
5073 		mtime += ERTS_MONOTONIC_OFFSET_NATIVE;
5074 		if (!IS_SSMALL(mtime))
5075 		    need += ERTS_SINT64_HEAP_SIZE(mtime);
5076 	    }
5077 	    else {
5078 		mtime = 0;
5079 		need += 4;
5080 	    }
5081 	    ctx.p = p;
5082 	    ctx.list = NIL;
5083 	    fixing_procs_rbt_foreach(tb->common.fixing_procs,
5084 				     fixing_procs_info_op,
5085 				     &ctx);
5086 
5087 	    hp = HAlloc(p, need);
5088 	    if (use_monotonic)
5089 		time = (IS_SSMALL(mtime)
5090 		       ? make_small(mtime)
5091 		       : erts_sint64_to_big(mtime, &hp));
5092 	    else {
5093 		Uint ms, s, us;
5094 		erts_make_timestamp_value(&ms, &s, &us,
5095 					  tb->common.time.monotonic,
5096 					  tb->common.time.offset);
5097 		time = TUPLE3(hp, make_small(ms), make_small(s), make_small(us));
5098 		hp += 4;
5099 	    }
5100 	    ret = TUPLE2(hp, time, ctx.list);
5101 	} else {
5102 	    ret = am_false;
5103 	}
5104         if (!DB_LOCK_FREE(tb))
5105             erts_mtx_unlock(&tb->common.fixlock);
5106     } else if (ERTS_IS_ATOM_STR("stats",What)) {
5107 	if (IS_HASH_TABLE(tb->common.status)) {
5108 	    FloatDef f;
5109 	    DbHashStats stats;
5110 	    Eterm avg, std_dev_real, std_dev_exp;
5111 	    Eterm* hp;
5112 
5113 	    db_calc_stats_hash(&tb->hash, &stats);
5114 	    hp = HAlloc(p, 1 + 7 + FLOAT_SIZE_OBJECT*3);
5115 	    f.fd = stats.avg_chain_len;
5116 	    avg = make_float(hp);
5117 	    PUT_DOUBLE(f, hp);
5118 	    hp += FLOAT_SIZE_OBJECT;
5119 
5120 	    f.fd = stats.std_dev_chain_len;
5121 	    std_dev_real = make_float(hp);
5122 	    PUT_DOUBLE(f, hp);
5123 	    hp += FLOAT_SIZE_OBJECT;
5124 
5125 	    f.fd = stats.std_dev_expected;
5126 	    std_dev_exp = make_float(hp);
5127 	    PUT_DOUBLE(f, hp);
5128 	    hp += FLOAT_SIZE_OBJECT;
5129 	    ret = TUPLE7(hp, make_small(erts_atomic_read_nob(&tb->hash.nactive)),
5130 			 avg, std_dev_real, std_dev_exp,
5131 			 make_small(stats.min_chain_len),
5132 			 make_small(stats.max_chain_len),
5133 			 make_small(stats.kept_items));
5134 	}
5135 	else if (IS_CATREE_TABLE(tb->common.status)) {
5136             DbCATreeStats stats;
5137             Eterm* hp;
5138 
5139             db_calc_stats_catree(&tb->catree, &stats);
5140             hp = HAlloc(p, 4);
5141             ret = TUPLE3(hp,
5142                          make_small(stats.route_nodes),
5143                          make_small(stats.base_nodes),
5144                          make_small(stats.max_depth));
5145 
5146         }
5147         else
5148 	    ret = am_false;
5149     }
5150     return ret;
5151 }
5152 
print_table(fmtfn_t to,void * to_arg,int show,DbTable * tb)5153 static void print_table(fmtfn_t to, void *to_arg, int show,  DbTable* tb)
5154 {
5155     Eterm tid;
5156     Eterm heap[ERTS_MAGIC_REF_THING_SIZE];
5157 
5158     if (is_table_named(tb)) {
5159         tid = tb->common.the_name;
5160     } else {
5161         ErlOffHeap oh;
5162         ERTS_INIT_OFF_HEAP(&oh);
5163         write_magic_ref_thing(heap, &oh, (ErtsMagicBinary *) tb->common.btid);
5164         tid = make_internal_ref(heap);
5165     }
5166 
5167     erts_print(to, to_arg, "Table: %T\n", tid);
5168     erts_print(to, to_arg, "Name: %T\n", tb->common.the_name);
5169 
5170     tb->common.meth->db_print(to, to_arg, show, tb);
5171 
5172     erts_print(to, to_arg, "Objects: %d\n", (int)DB_GET_APPROX_NITEMS(tb));
5173     erts_print(to, to_arg, "Words: %bpu\n",
5174 	       (Uint) ((DB_GET_APPROX_MEM_CONSUMED(tb)
5175 			+ sizeof(Uint)
5176 			- 1)
5177 		       / sizeof(Uint)));
5178     erts_print(to, to_arg, "Type: %T\n", table_info(NULL, tb, am_type));
5179     erts_print(to, to_arg, "Protection: %T\n", table_info(NULL, tb, am_protection));
5180     erts_print(to, to_arg, "Compressed: %T\n", table_info(NULL, tb, am_compressed));
5181     erts_print(to, to_arg, "Write Concurrency: %T\n", table_info(NULL, tb, am_write_concurrency));
5182     erts_print(to, to_arg, "Read Concurrency: %T\n", table_info(NULL, tb, am_read_concurrency));
5183 }
5184 
5185 typedef struct {
5186     fmtfn_t to;
5187     void *to_arg;
5188     int show;
5189 } ErtsPrintDbInfo;
5190 
5191 static void
db_info_print(DbTable * tb,void * vpdbip)5192 db_info_print(DbTable *tb, void *vpdbip)
5193 {
5194     ErtsPrintDbInfo *pdbip = (ErtsPrintDbInfo *) vpdbip;
5195     erts_print(pdbip->to, pdbip->to_arg, "=ets:%T\n", tb->common.owner);
5196     erts_print(pdbip->to, pdbip->to_arg, "Slot: %bpu\n", (Uint) tb);
5197     print_table(pdbip->to, pdbip->to_arg, pdbip->show, tb);
5198 }
5199 
db_info(fmtfn_t to,void * to_arg,int show)5200 void db_info(fmtfn_t to, void *to_arg, int show)    /* Called by break handler */
5201 {
5202     ErtsPrintDbInfo pdbi;
5203 
5204     pdbi.to = to;
5205     pdbi.to_arg = to_arg;
5206     pdbi.show = show;
5207 
5208     erts_db_foreach_table(db_info_print, &pdbi, !0);
5209 }
5210 
5211 Uint
erts_get_ets_misc_mem_size(void)5212 erts_get_ets_misc_mem_size(void)
5213 {
5214     ERTS_THR_MEMORY_BARRIER;
5215     /* Memory not allocated in ets_alloc */
5216     return (Uint) erts_atomic_read_nob(&erts_ets_misc_mem_size);
5217 }
5218 
5219 /* SMP Note: May only be used when system is locked */
5220 void
erts_db_foreach_table(void (* func)(DbTable *,void *),void * arg,int alive_only)5221 erts_db_foreach_table(void (*func)(DbTable *, void *), void *arg, int alive_only)
5222 {
5223     int ix;
5224 
5225     ASSERT(erts_thr_progress_is_blocking());
5226 
5227     for (ix = 0; ix < erts_no_schedulers; ix++) {
5228         ErtsSchedulerData *esdp = ERTS_SCHEDULER_IX(ix);
5229         DbTable *first = esdp->ets_tables.clist;
5230         if (first) {
5231             DbTable *tb = first;
5232             do {
5233                 if (!alive_only || is_table_alive(tb))
5234                     (*func)(tb, arg);
5235                 tb = tb->common.all.next;
5236             } while (tb != first);
5237         }
5238     }
5239 }
5240 
5241 /* SMP Note: May only be used when system is locked */
5242 void
erts_db_foreach_offheap(DbTable * tb,void (* func)(ErlOffHeap *,void *),void * arg)5243 erts_db_foreach_offheap(DbTable *tb,
5244 			void (*func)(ErlOffHeap *, void *),
5245 			void *arg)
5246 {
5247     tb->common.meth->db_foreach_offheap(tb, func, arg);
5248 }
5249 
5250 void
erts_db_foreach_thr_prgr_offheap(void (* func)(ErlOffHeap *,void *),void * arg)5251 erts_db_foreach_thr_prgr_offheap(void (*func)(ErlOffHeap *, void *),
5252                                  void *arg)
5253 {
5254     erts_db_foreach_thr_prgr_offheap_hash(func, arg);
5255     erts_db_foreach_thr_prgr_offheap_tree(func, arg);
5256     erts_db_foreach_thr_prgr_offheap_catree(func, arg);
5257 }
5258 
5259 /* retrieve max number of ets tables */
5260 Uint
erts_db_get_max_tabs()5261 erts_db_get_max_tabs()
5262 {
5263     return db_max_tabs;
5264 }
5265 
erts_ets_table_count(void)5266 Uint erts_ets_table_count(void)
5267 {
5268     Uint tb_count = 0;
5269     Uint six;
5270 
5271     for (six = 0; six < erts_no_schedulers; six++) {
5272         ErtsSchedulerData *esdp = &erts_aligned_scheduler_data[six].esd;
5273         tb_count += erts_atomic_read_nob(&esdp->ets_tables.count);
5274     }
5275     return tb_count;
5276 }
5277 
5278 /*
5279  * For testing of meta tables only.
5280  *
5281  * Given a name atom (as returned from ets:new/2), return a list of 'cnt'
5282  * number of other names that will hash to the same bucket in meta_name_tab.
5283  *
5284  * WARNING: Will bloat the atom table!
5285  */
5286 Eterm
erts_ets_colliding_names(Process * p,Eterm name,Uint cnt)5287 erts_ets_colliding_names(Process* p, Eterm name, Uint cnt)
5288 {
5289     Eterm list = NIL;
5290     Eterm* hp = HAlloc(p,cnt*2);
5291     Uint index = atom_val(name) & meta_name_tab_mask;
5292 
5293     while (cnt) {
5294         if (index != atom_val(name)) {
5295             while (index >= atom_table_size()) {
5296                 char tmp[20];
5297                 erts_snprintf(tmp, sizeof(tmp), "am%x", atom_table_size());
5298                 erts_atom_put((byte *) tmp, sys_strlen(tmp), ERTS_ATOM_ENC_LATIN1, 1);
5299             }
5300             list = CONS(hp, make_atom(index), list);
5301             hp += 2;
5302             --cnt;
5303         }
5304         index += meta_name_tab_mask + 1;
5305     }
5306     return list;
5307 }
5308 
5309 #ifdef ERTS_ENABLE_LOCK_COUNT
5310 
erts_lcnt_enable_db_lock_count(DbTable * tb,int enable)5311 void erts_lcnt_enable_db_lock_count(DbTable *tb, int enable) {
5312     if (DB_LOCK_FREE(tb))
5313         return;
5314     if(enable) {
5315         erts_lcnt_install_new_lock_info(&tb->common.rwlock.lcnt, "db_tab",
5316             tb->common.the_name, ERTS_LOCK_TYPE_RWMUTEX | ERTS_LOCK_FLAGS_CATEGORY_DB);
5317         erts_lcnt_install_new_lock_info(&tb->common.fixlock.lcnt, "db_tab_fix",
5318             tb->common.the_name, ERTS_LOCK_TYPE_MUTEX | ERTS_LOCK_FLAGS_CATEGORY_DB);
5319     } else {
5320         erts_lcnt_uninstall(&tb->common.rwlock.lcnt);
5321         erts_lcnt_uninstall(&tb->common.fixlock.lcnt);
5322     }
5323 
5324     if(IS_HASH_TABLE(tb->common.status)) {
5325         erts_lcnt_enable_db_hash_lock_count(&tb->hash, enable);
5326     } else if(IS_CATREE_TABLE(tb->common.status)) {
5327         /* erts_lcnt_enable_db_catree_lock_count is not thread safe so
5328            the table needs to get locked */
5329         db_lock(tb, LCK_WRITE);
5330         erts_lcnt_enable_db_catree_lock_count(&tb->catree, enable);
5331         db_unlock(tb, LCK_WRITE);
5332     }
5333 }
5334 
lcnt_update_db_locks_per_sched(void * enable)5335 static void lcnt_update_db_locks_per_sched(void *enable) {
5336     ErtsSchedulerData *esdp;
5337     DbTable *head;
5338 
5339     esdp = erts_get_scheduler_data();
5340     head = esdp->ets_tables.clist;
5341 
5342     if(head) {
5343         DbTable *iterator = head;
5344 
5345         do {
5346             if(is_table_alive(iterator)) {
5347                 erts_lcnt_enable_db_lock_count(iterator, !!enable);
5348             }
5349 
5350             iterator = iterator->common.all.next;
5351         } while (iterator != head);
5352     }
5353 }
5354 
erts_lcnt_update_db_locks(int enable)5355 void erts_lcnt_update_db_locks(int enable) {
5356     erts_schedule_multi_misc_aux_work(0, erts_no_schedulers,
5357         &lcnt_update_db_locks_per_sched, (void*)(UWord)enable);
5358 }
5359 #endif /* ERTS_ENABLE_LOCK_COUNT */
5360 
5361 #ifdef ETS_DBG_FORCE_TRAP
5362 int erts_ets_dbg_force_trap = 0;
5363 #endif
5364 
erts_ets_force_split(Eterm tid,int on)5365 int erts_ets_force_split(Eterm tid, int on)
5366 {
5367     Eterm ignore;
5368     DbTable* tb = tid2tab(tid, &ignore);
5369     if (!tb || !IS_CATREE_TABLE(tb->common.type))
5370         return 0;
5371 
5372     db_lock(tb, LCK_WRITE);
5373     if (!(tb->common.status & DB_DELETE))
5374         db_catree_force_split(&tb->catree, on);
5375     db_unlock(tb, LCK_WRITE);
5376     return 1;
5377 }
5378 
erts_ets_debug_random_split_join(Eterm tid,int on)5379 int erts_ets_debug_random_split_join(Eterm tid, int on)
5380 {
5381     Eterm ignore;
5382     DbTable* tb = tid2tab(tid, &ignore);
5383     if (!tb || !IS_CATREE_TABLE(tb->common.type))
5384         return 0;
5385 
5386     db_lock(tb, LCK_WRITE);
5387     if (!(tb->common.status & DB_DELETE))
5388         db_catree_debug_random_split_join(&tb->catree, on);
5389     db_unlock(tb, LCK_WRITE);
5390     return 1;
5391 }
5392